Source code for variationist.metrics.pmi

import math
import numpy as np
import pandas as pd
from collections import Counter
from itertools import islice
from tqdm import tqdm

from variationist.metrics import shared_metrics, lexical_artifacts


[docs]def safe_divide(numerator, denominator): """Utility function to avoid zero division errors.""" if denominator == 0 or denominator == 0.0: result = 0 else: result = numerator / denominator return result
[docs]def take(n, iterable): """Return the first n items of the iterable as a list.""" return list(islice(iterable, n))
[docs]def get_total(freqs_merged_dict): """Function to add up the frequency of tokens across labels.""" total = 0 for w in freqs_merged_dict: total += freqs_merged_dict[w] return total
[docs]def create_pmi_dictionary(label_values_dict, subsets_of_interest, weighted, freq_cutoff): """Creates a dictionary of pmi values for each label.""" output_pmi = dict() freqs_dict = dict() freqs_merged_dict = dict() totals_dict = dict() label_count = dict() for column in label_values_dict: # print(subsets_of_interest[column]) for l in tqdm(range(len(label_values_dict[column]))): curr_label = subsets_of_interest[column][l].name mydict = shared_metrics.get_all_frequencies(subsets_of_interest[column][l]) freqs_dict[curr_label] = mydict tok_list = list(mydict.keys()) for i in range(len(tok_list)): tok = tok_list[i] if tok not in freqs_merged_dict: freqs_merged_dict[tok] = 0 freqs_merged_dict[tok] += mydict[tok] for i in subsets_of_interest[column][l]: if curr_label not in label_count: label_count[curr_label] = 0 label_count[curr_label] += 1 total = get_total(freqs_merged_dict) # Keep only tokens above the overall frequency cutoff for the PMI (the total remains the same) freqs_merged_dict = { tok: count for tok, count in freqs_merged_dict.items() if count >= freq_cutoff } for label in freqs_dict: label_pmi_dict = dict() for w in freqs_dict[label]: if w in freqs_merged_dict: pxy = freqs_dict[label][w]/total px = label_count[label]/total py = freqs_merged_dict[w]/total # pmi_value = math.log2(pxy/(px*py)) pmi_value = math.log2(safe_divide(pxy,(px*py))) if weighted: pmi_value = pmi_value*freqs_dict[label][w] label_pmi_dict[w] = pmi_value sorted_pmiDict = sorted(label_pmi_dict.items(), key=lambda x:x[1], reverse=True) converted_dict = dict(sorted_pmiDict) output_pmi[str(label)] = converted_dict return output_pmi
[docs]def pmi(label_values_dict, subsets_of_interest, args): """Function to calculate PMI. Parameters ---------- label_values_dict: Dict A dictionary containing all of the possible values each variable can take in the input dataset. subsets_of_interest: Dict A dictionary containing a pandas series with tokenized texts for each variable/text column combination out of the variables and text columns specified by the user. args: InspectorArgs The arguments selected by the user. Returns ------- output_pmi: Dict A dictionary with the pmi for each token in each subset of interest. """ output_pmi = create_pmi_dictionary(label_values_dict, subsets_of_interest, False, args.freq_cutoff) # # Print for debug # for label in output_pmi: # sorted_mydict = sorted(output_pmi[label].items(), key=lambda x:x[1], reverse=True) # converted_dict = dict(sorted_mydict) # print("\nPMI", label, take(10, converted_dict.items())) #print for debug return output_pmi
[docs]def pmi_normalized(label_values_dict, subsets_of_interest, args): """Function to calculate normalized PMI. Parameters ---------- label_values_dict: Dict A dictionary containing all of the possible values each variable can take in the input dataset. subsets_of_interest: Dict A dictionary containing a pandas series with tokenized texts for each variable/text column combination out of the variables and text columns specified by the user. args: InspectorArgs The arguments selected by the user. Returns ------- output_pmi: Dict A dictionary with the normalized pmi for each token in each subset of interest. """ output_pmi = create_pmi_dictionary(label_values_dict, subsets_of_interest, False, args.freq_cutoff) min_max_list = [] for label in output_pmi: if len(output_pmi[label]) > 0: # if the list is not empty min_max_list.append(min(output_pmi[label].values())) min_max_list.append(max(output_pmi[label].values())) min_value = min(min_max_list) max_value = max(min_max_list) for label in output_pmi: for w in output_pmi[label]: output_pmi[label][w] = (output_pmi[label][w] - min_value) / (max_value - min_value) # # Print for debug # for label in output_pmi: # sorted_mydict = sorted(output_pmi[label].items(), key=lambda x:x[1], reverse=True) # converted_dict = dict(sorted_mydict) # print("\nPMI normalized", label, take(10, converted_dict.items())) #print for debug return output_pmi
[docs]def pmi_positive(label_values_dict, subsets_of_interest, args): """Function to calculate positive PMI (negative values are set to 0). Parameters ---------- label_values_dict: Dict A dictionary containing all of the possible values each variable can take in the input dataset. subsets_of_interest: Dict A dictionary containing a pandas series with tokenized texts for each variable/text column combination out of the variables and text columns specified by the user. args: InspectorArgs The arguments selected by the user. Returns ------- output_pmi: Dict A dictionary with the positive PMI for each token in each subset of interest. """ output_pmi = create_pmi_dictionary(label_values_dict, subsets_of_interest, False, args.freq_cutoff) for label in output_pmi: for w in output_pmi[label]: if output_pmi[label][w] < 0: output_pmi[label][w] = 0 # # Print for debug # for label in output_pmi: # sorted_mydict = sorted(output_pmi[label].items(), key=lambda x:x[1], reverse=True) # converted_dict = dict(sorted_mydict) # print("\nPositive PMI", label, take(10, converted_dict.items())) #print for debug return output_pmi
[docs]def pmi_positive_normalized(label_values_dict, subsets_of_interest, args): """Function to calculate positive normalized PMI (negative values are set to 0 and all values are normalized between 0 and 1). Parameters ---------- label_values_dict: Dict A dictionary containing all of the possible values each variable can take in the input dataset. subsets_of_interest: Dict A dictionary containing a pandas series with tokenized texts for each variable/text column combination out of the variables and text columns specified by the user. args: InspectorArgs The arguments selected by the user. Returns ------- output_pmi: Dict A dictionary with the positive normalized PMI for each token in each subset of interest. """ output_pmi = create_pmi_dictionary(label_values_dict, subsets_of_interest, False, args.freq_cutoff) min_max_list = [] for label in output_pmi: for w in output_pmi[label]: if output_pmi[label][w] < 0: output_pmi[label][w] = 0 if len(output_pmi[label]) > 0: # if the list is not empty min_max_list.append(min(output_pmi[label].values())) min_max_list.append(max(output_pmi[label].values())) min_value = min(min_max_list) max_value = max(min_max_list) for label in output_pmi: for w in output_pmi[label]: output_pmi[label][w] = safe_divide( (output_pmi[label][w] - min_value) , (max_value - min_value) ) # # Print for debug # for label in output_pmi: # sorted_mydict = sorted(output_pmi[label].items(), key=lambda x:x[1], reverse=True) # converted_dict = dict(sorted_mydict) # print("\nPositive PMI normalized", label, take(10, converted_dict.items())) #print for debug return output_pmi
[docs]def pmi_weighted(label_values_dict, subsets_of_interest, args): """Function to calculate weighted PMI. Parameters ---------- label_values_dict: Dict A dictionary containing all of the possible values each variable can take in the input dataset. subsets_of_interest: Dict A dictionary containing a pandas series with tokenized texts for each variable/text column combination out of the variables and text columns specified by the user. args: InspectorArgs The arguments selected by the user. Returns ------- output_pmi: Dict A dictionary with the weighted PMI for each token in each subset of interest. """ output_pmi = create_pmi_dictionary(label_values_dict, subsets_of_interest, True, args.freq_cutoff) # # Print for debug # for label in output_pmi: # sorted_mydict = sorted(output_pmi[label].items(), key=lambda x:x[1], reverse=True) # converted_dict = dict(sorted_mydict) # print("\nPMI weighted", label, take(10, converted_dict.items())) #print for debug return output_pmi
[docs]def pmi_normalized_weighted(label_values_dict, subsets_of_interest, args): """Function to calculate normalized weighted PMI. Parameters ---------- label_values_dict: Dict A dictionary containing all of the possible values each variable can take in the input dataset. subsets_of_interest: Dict A dictionary containing a pandas series with tokenized texts for each variable/text column combination out of the variables and text columns specified by the user. args: InspectorArgs The arguments selected by the user. Returns ------- output_pmi: Dict A dictionary with the normalized weighted PMI for each token in each subset of interest. """ output_pmi = create_pmi_dictionary(label_values_dict, subsets_of_interest, True, args.freq_cutoff) min_max_list = [] for label in output_pmi: if len(output_pmi[label]) > 0: # if the list is not empty min_max_list.append(min(output_pmi[label].values())) min_max_list.append(max(output_pmi[label].values())) min_value = min(min_max_list) max_value = max(min_max_list) for label in output_pmi: for w in output_pmi[label]: output_pmi[label][w] = (output_pmi[label][w] - min_value) / (max_value - min_value) # # Print for debug # for label in output_pmi: # sorted_mydict = sorted(output_pmi[label].items(), key=lambda x:x[1], reverse=True) # converted_dict = dict(sorted_mydict) # print("\nPMI normalized weighted", label, take(10, converted_dict.items())) #print for debug return output_pmi
[docs]def pmi_positive_weighted(label_values_dict, subsets_of_interest, args): """Function to calculate positive weighted PMI. Parameters ---------- label_values_dict: Dict A dictionary containing all of the possible values each variable can take in the input dataset. subsets_of_interest: Dict A dictionary containing a pandas series with tokenized texts for each variable/text column combination out of the variables and text columns specified by the user. args: InspectorArgs The arguments selected by the user. Returns ------- output_pmi: Dict A dictionary with the positive weighted PMI for each token in each subset of interest. """ output_pmi = create_pmi_dictionary(label_values_dict, subsets_of_interest, True, args.freq_cutoff) for label in output_pmi: for w in output_pmi[label]: if output_pmi[label][w] < 0: output_pmi[label][w] = 0 # # Print for debug # for label in output_pmi: # sorted_mydict = sorted(output_pmi[label].items(), key=lambda x:x[1], reverse=True) # converted_dict = dict(sorted_mydict) # print("\nPositive PMI weighted", label, take(10, converted_dict.items())) #print for debug return output_pmi
[docs]def pmi_positive_normalized_weighted(label_values_dict, subsets_of_interest, args): """Function to calculate positive normalized weighted PMI. Parameters ---------- label_values_dict: Dict A dictionary containing all of the possible values each variable can take in the input dataset. subsets_of_interest: Dict A dictionary containing a pandas series with tokenized texts for each variable/text column combination out of the variables and text columns specified by the user. args: InspectorArgs The arguments selected by the user. Returns ------- output_pmi: Dict A dictionary with the positive normalized weighted PMI for each token in each subset of interest. """ output_pmi = create_pmi_dictionary(label_values_dict, subsets_of_interest, True, args.freq_cutoff) min_max_list = [] for label in output_pmi: for w in output_pmi[label]: if output_pmi[label][w] < 0: output_pmi[label][w] = 0 if len(output_pmi[label]) > 0: # if the list is not empty min_max_list.append(min(output_pmi[label].values())) min_max_list.append(max(output_pmi[label].values())) min_value = min(min_max_list) max_value = max(min_max_list) for label in output_pmi: for w in output_pmi[label]: output_pmi[label][w] = safe_divide( (output_pmi[label][w] - min_value) , (max_value - min_value)) # # Print for debug # for label in output_pmi: # sorted_mydict = sorted(output_pmi[label].items(), key=lambda x:x[1], reverse=True) # converted_dict = dict(sorted_mydict) # print("\nPositive PMI normalized weighted", label, take(10, converted_dict.items())) #print for debug return output_pmi
[docs]def class_relevance_positive_normalized(label_values_dict, subsets_of_interest, args): """Function to calculate a PMI-based class relevance metric, which consists in normalizing by subset the positive normalized PMI values. Parameters ---------- label_values_dict: Dict A dictionary containing all of the possible values each variable can take in the input dataset. subsets_of_interest: Dict A dictionary containing a pandas series with tokenized texts for each variable/text column combination out of the variables and text columns specified by the user. args: InspectorArgs The arguments selected by the user. Returns ------- output_pmi: Dict A dictionary with the positive normalized class relevance metric for each token in each subset of interest. """ output_pmi = create_pmi_dictionary(label_values_dict, subsets_of_interest, False, args.freq_cutoff) for label in output_pmi: if len(output_pmi[label]) > 0: # if the list is not empty min_max_list = [] min_max_list.append(min(output_pmi[label].values())) min_max_list.append(max(output_pmi[label].values())) min_value = min(min_max_list) max_value = max(min_max_list) for w in output_pmi[label]: output_pmi[label][w] = safe_divide( (output_pmi[label][w]-min_value), (max_value-min_value)) if output_pmi[label][w] < 0: output_pmi[label][w] = 0 return output_pmi
[docs]def class_relevance_normalized_weighted(label_values_dict, subsets_of_interest, args): """Function to calculate a PMI-based class relevance metric, which consists in normalizing by subset the normalized weighted PMI values. Parameters ---------- label_values_dict: Dict A dictionary containing all of the possible values each variable can take in the input dataset. subsets_of_interest: Dict A dictionary containing a pandas series with tokenized texts for each variable/text column combination out of the variables and text columns specified by the user. args: InspectorArgs The arguments selected by the user. Returns ------- output_pmi: Dict A dictionary with the normalized weighted class relevance metric for each token in each subset of interest. """ output_pmi = create_pmi_dictionary(label_values_dict, subsets_of_interest, True, args.freq_cutoff) for label in output_pmi: if len(output_pmi[label]) > 0: # if the list is not empty min_max_list = [] min_max_list.append(min(output_pmi[label].values())) min_max_list.append(max(output_pmi[label].values())) min_value = min(min_max_list) max_value = max(min_max_list) for w in output_pmi[label]: output_pmi[label][w] = safe_divide( (output_pmi[label][w]-min_value), (max_value-min_value) ) return output_pmi
[docs]def class_relevance_positive_normalized_weighted(label_values_dict, subsets_of_interest, args): """Function to calculate a PMI-based class relevance metric, which consists in normalizing by subset the positive normalized weighted PMI values. Parameters ---------- label_values_dict: Dict A dictionary containing all of the possible values each variable can take in the input dataset. subsets_of_interest: Dict A dictionary containing a pandas series with tokenized texts for each variable/text column combination out of the variables and text columns specified by the user. args: InspectorArgs The arguments selected by the user. Returns ------- output_pmi: Dict A dictionary with the positive normalized weighted class relevance metric for each token in each subset of interest. """ output_pmi = create_pmi_dictionary(label_values_dict, subsets_of_interest, True, args.freq_cutoff) for label in output_pmi: if len(output_pmi[label]) > 0: # if the list is not empty min_max_list = [] min_max_list.append(min(output_pmi[label].values())) min_max_list.append(max(output_pmi[label].values())) min_value = min(min_max_list) max_value = max(min_max_list) for w in output_pmi[label]: output_pmi[label][w] = safe_divide( (output_pmi[label][w]-min_value), (max_value-min_value)) if output_pmi[label][w] < 0: output_pmi[label][w] = 0 return output_pmi
[docs]def pmi_lexical_artifacts(label_values_dict, subsets_of_interest, args): """Function to calculate a PMI-based class relevance metric as illustrated in Ramponi and Tonelli (2022). Parameters ---------- label_values_dict: Dict A dictionary containing all of the possible values each variable can take in the input dataset. subsets_of_interest: Dict A dictionary containing a pandas series with tokenized texts for each variable/text column combination out of the variables and text columns specified by the user. args: InspectorArgs The arguments selected by the user. Returns ------- lexical_artifacts_dict: Dict A dictionary with the associated lexical-artifacts scores for each token in each subset. """ texts_list = [] labels_list = [] for column in label_values_dict: for l in range(len(label_values_dict[column])): curr_label = subsets_of_interest[column][l].name for text in subsets_of_interest[column][l]: texts_list.append(" ".join(text)) labels_list.append(str(curr_label)) uniqe_labels = list(dict.fromkeys(labels_list)) lexical_artifacts_dict = dict() for label in uniqe_labels: lexical_artifacts_dict[label] = dict() values_df = lexical_artifacts.compute( texts = texts_list, labels = labels_list, label_of_interest = label, ) top_k = len(values_df) for token, row in values_df.head(top_k).iterrows(): lexical_artifacts_dict[label][token] = row[values_df.columns[0]] return lexical_artifacts_dict