diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..06d7405 Binary files /dev/null and b/utils/__init__.py differ diff --git a/utils/config.py b/utils/config.py new file mode 100644 index 0000000..4aae1d9 --- /dev/null +++ b/utils/config.py @@ -0,0 +1,46 @@ +""" +FScanpy Configuration Module +""" +import os + +class BaseConfig: + """Base configuration class with virtual paths""" + + # Virtual data paths + DATA_DIR = "/path/to/data" + TRAIN_DATA = "/path/to/data/merged_train_data.csv" + TEST_DATA = "/path/to/data/merged_test_data.csv" + VALIDATION_DATA = "/path/to/data/merged_validation_data.csv" + + # Virtual model paths + MODEL_DIR = "/path/to/models" + BILSTM_MODEL_DIR = "/path/to/models/bilstm" + GB_MODEL_DIR = "/path/to/models/gradient_boosting" + + # Virtual result paths + RESULT_DIR = "/path/to/results" + BILSTM_DIR = "/path/to/results/bilstm" + GB_DIR = "/path/to/results/gradient_boosting" + MFEGB_DIR = "/path/to/results/mfe_gb" + + # Virtual log paths (for minimal logging if needed) + LOG_DIR = "/path/to/logs" + BILSTM_LOG_DIR = "/path/to/logs/bilstm" + MFEGB_LOG_DIR = "/path/to/logs/mfe_gb" + + # Virtual plot paths (not used in sanitized version) + PLOT_DIR = "/path/to/plots" + BILSTM_PLOT_DIR = "/path/to/plots/bilstm" + + @classmethod + def create_directories(cls): + """Create necessary directories (virtual implementation)""" + # In actual implementation, this would create the directories + # For publication, this is a placeholder + directories = [ + cls.DATA_DIR, cls.MODEL_DIR, cls.RESULT_DIR, cls.LOG_DIR, + cls.BILSTM_MODEL_DIR, cls.GB_MODEL_DIR, cls.BILSTM_DIR, + cls.GB_DIR, cls.MFEGB_DIR, cls.BILSTM_LOG_DIR, cls.MFEGB_LOG_DIR + ] + for directory in directories: + os.makedirs(directory, exist_ok=True) diff --git a/utils/function.py b/utils/function.py new file mode 100644 index 0000000..57cb088 --- /dev/null +++ b/utils/function.py @@ -0,0 +1,418 @@ +""" +Common Functions for FScanpy Models +""" +import os +import numpy as np +import pandas as pd +import pickle +import json +from sklearn.metrics import ( + accuracy_score, + recall_score, + precision_score, + roc_auc_score, + log_loss, + f1_score +) +from utils.config import BaseConfig + +def select_low_confidence_samples_cnn(model, X_unlabeled, unlabeled_data, confidence_threshold=0.5): + """ + Select low confidence samples and assign pseudo labels + + Args: + model: Current model + X_unlabeled: Unlabeled data features + unlabeled_data: Unlabeled data DataFrame with final_prob column as confidence + confidence_threshold: Not used, kept for compatibility + + Returns: + selected: Selected samples with pseudo labels + """ + # Check if final_prob column exists + if 'final_prob' not in unlabeled_data.columns: + return pd.DataFrame() + + # Predict probabilities for unlabeled data + probs = model.predict(X_unlabeled) + + # For binary classification, Keras outputs positive class probability + # Build complete probability distribution [1-p, p] + probs_full = np.column_stack([1-probs, probs]) + + # Calculate entropy (prediction uncertainty) + epsilon = 1e-15 + probs_full_safe = np.clip(probs_full, epsilon, 1 - epsilon) + entropy = -np.sum(probs_full_safe * np.log(probs_full_safe), axis=1) + + # Get predicted labels + preds = (probs > 0.5).astype(int).flatten() + + # Create result DataFrame + result_df = pd.DataFrame({ + 'entropy': entropy, + 'pseudo_label': preds, + 'prob': probs.flatten() + }, index=unlabeled_data.index) + + # Get final_prob as confidence from original data + final_probs = unlabeled_data['final_prob'].values + + # Select samples based on: entropy < confidence(final_prob) & prediction_prob > 0.5 + selected_mask = (result_df['entropy'] < final_probs) & (result_df['prob'] > 0.5) + + # Select qualifying samples + selected = unlabeled_data.loc[result_df[selected_mask].index].copy() + + # Add pseudo labels to selected samples + if not selected.empty: + selected['label'] = result_df.loc[result_df[selected_mask].index, 'pseudo_label'].values + + return selected + +def convert_numpy_types(obj): + """ + Recursively convert NumPy data types to Python native types for JSON serialization + + Args: + obj: Any object that may contain NumPy data types + + Returns: + Converted object with all NumPy types converted to Python native types + """ + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.bool_): + return bool(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, dict): + return {key: convert_numpy_types(value) for key, value in obj.items()} + elif isinstance(obj, list): + return [convert_numpy_types(item) for item in obj] + elif isinstance(obj, tuple): + return tuple(convert_numpy_types(item) for item in obj) + else: + return obj + +def save_training_info(model, training_info, save_dir, model_type="best", is_final_model=False): + """ + Save model and training information + + Args: + model: Trained model + training_info: Training information dictionary + save_dir: Save directory + model_type: Model type, "best" for best model, "final" for final model + is_final_model: Whether this is the final model from self-training + """ + # Create save directory + os.makedirs(save_dir, exist_ok=True) + + try: + # Save model + model_filename = f"{model_type}_model.h5" + model_path = os.path.join(save_dir, model_filename) + model.save(model_path) + + # Prepare training info for JSON serialization + serializable_info = convert_numpy_types(training_info) + + # Save training info + info_filename = f"{model_type}_training_info.json" + info_path = os.path.join(save_dir, info_filename) + + with open(info_path, 'w') as f: + json.dump(serializable_info, f, indent=2) + + # Save model weights separately + weights_filename = f"{model_type}_weights.pkl" + weights_path = os.path.join(save_dir, weights_filename) + + with open(weights_path, 'wb') as f: + pickle.dump(model.get_weights(), f) + + except Exception as e: + pass + +def load_data(neg_samples=20000): + """ + Load data + + Args: + neg_samples: Number of randomly selected EUPLOTES negative samples, + if None use all negative samples + + Returns: + train_data: Training data + test_data: Test data + low_conf_data: Low confidence data + xu_data: Xu dataset as additional validation set + atkins_data: Atkins dataset as additional validation set + """ + try: + # Load merged data files + train_data = pd.read_csv(BaseConfig.TRAIN_DATA) + test_data = pd.read_csv(BaseConfig.TEST_DATA) + validation_data = pd.read_csv(BaseConfig.VALIDATION_DATA) + + # Ensure required columns exist + required_columns = ['full_seq', 'label', 'source'] + + for df in [train_data, test_data, validation_data]: + for col in required_columns: + if col not in df.columns: + if col == 'label': + df[col] = 0 + elif col == 'source': + df[col] = 'unknown' + else: + df[col] = '' + + # Separate validation datasets + xu_data = validation_data[validation_data['source'] == 'Xu'].copy() + atkins_data = validation_data[validation_data['source'] == 'Atkins'].copy() + + # Create low confidence data (placeholder) + low_conf_data = pd.DataFrame() + + # Sample negative samples if specified + if neg_samples is not None: + # Sample from EUPLOTES negative samples in training data + euplotes_neg = train_data[ + (train_data['source'] == 'EUPLOTES') & + (train_data['label'] == 0) + ] + + if len(euplotes_neg) > neg_samples: + sampled_neg = euplotes_neg.sample(n=neg_samples, random_state=42) + # Keep positive samples and other sources, replace EUPLOTES negatives + train_data = pd.concat([ + train_data[~((train_data['source'] == 'EUPLOTES') & (train_data['label'] == 0))], + sampled_neg + ], ignore_index=True) + + return train_data, test_data, low_conf_data, xu_data, atkins_data + + except Exception as e: + # Return empty DataFrames on error + empty_df = pd.DataFrame() + return empty_df, empty_df, empty_df, empty_df, empty_df + +def select_low_confidence_samples_gb(model, X_unlabeled, unlabeled_data, confidence_threshold=0.5): + """ + Select low confidence samples and assign pseudo labels for GB model + + Args: + model: Current model + X_unlabeled: Unlabeled data features + unlabeled_data: Unlabeled data DataFrame with final_prob column as confidence + confidence_threshold: Not used, kept for compatibility + + Returns: + selected: Low confidence samples with pseudo labels and sequence information + """ + # Check if final_prob column exists + if 'final_prob' not in unlabeled_data.columns: + return pd.DataFrame() + + try: + # Predict probabilities + probs = model.predict_proba(X_unlabeled) + + # For binary classification, get positive class probability + if probs.shape[1] == 2: + pos_probs = probs[:, 1] + else: + pos_probs = probs.flatten() + + # Calculate entropy + epsilon = 1e-15 + probs_safe = np.clip(probs, epsilon, 1 - epsilon) + entropy = -np.sum(probs_safe * np.log(probs_safe), axis=1) + + # Get predicted labels + preds = model.predict(X_unlabeled) + + # Create result DataFrame + result_df = pd.DataFrame({ + 'entropy': entropy, + 'pseudo_label': preds, + 'prob': pos_probs + }, index=unlabeled_data.index) + + # Get confidence from original data + final_probs = unlabeled_data['final_prob'].values + + # Select samples: entropy < confidence & prediction_prob > 0.5 + selected_mask = (result_df['entropy'] < final_probs) & (result_df['prob'] > 0.5) + + # Select qualifying samples + selected = unlabeled_data.loc[result_df[selected_mask].index].copy() + + # Add pseudo labels + if not selected.empty: + selected['label'] = result_df.loc[result_df[selected_mask].index, 'pseudo_label'].values + + return selected + + except Exception as e: + return pd.DataFrame() + +def evaluate_model_gb(model, X, y): + """ + Evaluate model performance for GB model + + Args: + model: Trained model + X: Feature matrix + y: Labels + + Returns: + metrics: Performance metrics dictionary + """ + default_metrics = { + 'accuracy': 0.0, 'auc': 0.0, 'f1': 0.0, + 'precision': 0.0, 'recall': 0.0, 'loss': float('inf') + } + + try: + # Get predictions + y_pred = model.predict(X) + y_pred_proba = model.predict_proba(X) + + # Get positive class probabilities + if y_pred_proba.shape[1] == 2: + y_pred_prob = y_pred_proba[:, 1] + else: + y_pred_prob = y_pred_proba.flatten() + + metrics = default_metrics.copy() + + # Calculate metrics + try: + metrics['accuracy'] = accuracy_score(y, y_pred) + except Exception: + pass + + try: + if len(np.unique(y_pred_prob)) > 1: + metrics['auc'] = roc_auc_score(y, y_pred_prob) + else: + metrics['auc'] = 0.5 + except Exception: + pass + + try: + metrics['f1'] = f1_score(y, y_pred, zero_division=0) + except Exception: + pass + + try: + metrics['precision'] = precision_score(y, y_pred, zero_division=0) + except Exception: + pass + + try: + metrics['recall'] = recall_score(y, y_pred, zero_division=0) + except Exception: + pass + + try: + y_pred_prob_safe = np.clip(y_pred_prob, 1e-15, 1-1e-15) + metrics['loss'] = log_loss(y, y_pred_prob_safe) + except Exception: + pass + + return metrics + + except Exception as e: + return default_metrics + +def evaluate_model_cnn(model, X_test, y_test): + """Evaluate CNN model performance""" + default_metrics = { + 'accuracy': 0.0, 'auc': 0.0, 'f1': 0.0, + 'precision': 0.0, 'recall': 0.0, 'loss': float('inf') + } + + try: + # Batch prediction to avoid memory issues + batch_size = 128 + n_samples = len(X_test) + n_batches = (n_samples + batch_size - 1) // batch_size + + y_pred = np.zeros(n_samples) + for i in range(n_batches): + start_idx = i * batch_size + end_idx = min((i + 1) * batch_size, n_samples) + batch_preds = model.predict(X_test[start_idx:end_idx], verbose=0) + + # Handle multi-output models + if isinstance(batch_preds, list): + batch_preds = batch_preds[0] + + # Ensure predictions are 1D + if len(batch_preds.shape) > 1 and batch_preds.shape[1] > 1: + batch_preds = batch_preds[:, 1] + elif len(batch_preds.shape) > 1: + batch_preds = batch_preds.flatten() + + y_pred[start_idx:end_idx] = batch_preds + + # Convert probabilities to binary predictions + y_pred_binary = (y_pred > 0.5).astype(int) + + # Ensure labels are 1D + if len(y_test.shape) > 1: + y_test = y_test.flatten() + + metrics = default_metrics.copy() + + # Calculate metrics individually + try: + metrics['accuracy'] = accuracy_score(y_test, y_pred_binary) + except Exception: + pass + + try: + if len(np.unique(y_pred)) > 1: + metrics['auc'] = roc_auc_score(y_test, y_pred) + else: + # Handle case where all predictions are the same + if (np.mean(y_pred) > 0.5 and np.mean(y_test) > 0.5) or \ + (np.mean(y_pred) <= 0.5 and np.mean(y_test) < 0.5): + metrics['auc'] = 0.55 + else: + metrics['auc'] = 0.45 + except Exception: + pass + + try: + metrics['f1'] = f1_score(y_test, y_pred_binary, zero_division=0) + except Exception: + pass + + try: + metrics['precision'] = precision_score(y_test, y_pred_binary, zero_division=0) + except Exception: + pass + + try: + metrics['recall'] = recall_score(y_test, y_pred_binary, zero_division=0) + except Exception: + pass + + # Calculate loss + try: + y_pred_prob = np.clip(y_pred, 1e-15, 1-1e-15) + metrics['loss'] = log_loss(y_test, y_pred_prob) + except Exception: + pass + + return metrics + + except Exception as e: + return default_metrics