""" Common Functions for FScanpy Models """ import os import numpy as np import pandas as pd import pickle import json from sklearn.metrics import ( accuracy_score, recall_score, precision_score, roc_auc_score, log_loss, f1_score ) from utils.config import BaseConfig def select_low_confidence_samples_cnn(model, X_unlabeled, unlabeled_data, confidence_threshold=0.5): """ Select low confidence samples and assign pseudo labels Args: model: Current model X_unlabeled: Unlabeled data features unlabeled_data: Unlabeled data DataFrame with final_prob column as confidence confidence_threshold: Not used, kept for compatibility Returns: selected: Selected samples with pseudo labels """ # Check if final_prob column exists if 'final_prob' not in unlabeled_data.columns: return pd.DataFrame() # Predict probabilities for unlabeled data probs = model.predict(X_unlabeled) # For binary classification, Keras outputs positive class probability # Build complete probability distribution [1-p, p] probs_full = np.column_stack([1-probs, probs]) # Calculate entropy (prediction uncertainty) epsilon = 1e-15 probs_full_safe = np.clip(probs_full, epsilon, 1 - epsilon) entropy = -np.sum(probs_full_safe * np.log(probs_full_safe), axis=1) # Get predicted labels preds = (probs > 0.5).astype(int).flatten() # Create result DataFrame result_df = pd.DataFrame({ 'entropy': entropy, 'pseudo_label': preds, 'prob': probs.flatten() }, index=unlabeled_data.index) # Get final_prob as confidence from original data final_probs = unlabeled_data['final_prob'].values # Select samples based on: entropy < confidence(final_prob) & prediction_prob > 0.5 selected_mask = (result_df['entropy'] < final_probs) & (result_df['prob'] > 0.5) # Select qualifying samples selected = unlabeled_data.loc[result_df[selected_mask].index].copy() # Add pseudo labels to selected samples if not selected.empty: selected['label'] = result_df.loc[result_df[selected_mask].index, 'pseudo_label'].values return selected def convert_numpy_types(obj): """ Recursively convert NumPy data types to Python native types for JSON serialization Args: obj: Any object that may contain NumPy data types Returns: Converted object with all NumPy types converted to Python native types """ if isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.bool_): return bool(obj) elif isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, dict): return {key: convert_numpy_types(value) for key, value in obj.items()} elif isinstance(obj, list): return [convert_numpy_types(item) for item in obj] elif isinstance(obj, tuple): return tuple(convert_numpy_types(item) for item in obj) else: return obj def save_training_info(model, training_info, save_dir, model_type="best", is_final_model=False): """ Save model and training information Args: model: Trained model training_info: Training information dictionary save_dir: Save directory model_type: Model type, "best" for best model, "final" for final model is_final_model: Whether this is the final model from self-training """ # Create save directory os.makedirs(save_dir, exist_ok=True) try: # Save model model_filename = f"{model_type}_model.h5" model_path = os.path.join(save_dir, model_filename) model.save(model_path) # Prepare training info for JSON serialization serializable_info = convert_numpy_types(training_info) # Save training info info_filename = f"{model_type}_training_info.json" info_path = os.path.join(save_dir, info_filename) with open(info_path, 'w') as f: json.dump(serializable_info, f, indent=2) # Save model weights separately weights_filename = f"{model_type}_weights.pkl" weights_path = os.path.join(save_dir, weights_filename) with open(weights_path, 'wb') as f: pickle.dump(model.get_weights(), f) except Exception as e: pass def load_data(neg_samples=20000): """ Load data Args: neg_samples: Number of randomly selected EUPLOTES negative samples, if None use all negative samples Returns: train_data: Training data test_data: Test data low_conf_data: Low confidence data xu_data: Xu dataset as additional validation set atkins_data: Atkins dataset as additional validation set """ try: # Load merged data files train_data = pd.read_csv(BaseConfig.TRAIN_DATA) test_data = pd.read_csv(BaseConfig.TEST_DATA) validation_data = pd.read_csv(BaseConfig.VALIDATION_DATA) # Ensure required columns exist required_columns = ['full_seq', 'label', 'source'] for df in [train_data, test_data, validation_data]: for col in required_columns: if col not in df.columns: if col == 'label': df[col] = 0 elif col == 'source': df[col] = 'unknown' else: df[col] = '' # Separate validation datasets xu_data = validation_data[validation_data['source'] == 'Xu'].copy() atkins_data = validation_data[validation_data['source'] == 'Atkins'].copy() # Create low confidence data (placeholder) low_conf_data = pd.DataFrame() # Sample negative samples if specified if neg_samples is not None: # Sample from EUPLOTES negative samples in training data euplotes_neg = train_data[ (train_data['source'] == 'EUPLOTES') & (train_data['label'] == 0) ] if len(euplotes_neg) > neg_samples: sampled_neg = euplotes_neg.sample(n=neg_samples, random_state=42) # Keep positive samples and other sources, replace EUPLOTES negatives train_data = pd.concat([ train_data[~((train_data['source'] == 'EUPLOTES') & (train_data['label'] == 0))], sampled_neg ], ignore_index=True) return train_data, test_data, low_conf_data, xu_data, atkins_data except Exception as e: # Return empty DataFrames on error empty_df = pd.DataFrame() return empty_df, empty_df, empty_df, empty_df, empty_df def select_low_confidence_samples_gb(model, X_unlabeled, unlabeled_data, confidence_threshold=0.5): """ Select low confidence samples and assign pseudo labels for GB model Args: model: Current model X_unlabeled: Unlabeled data features unlabeled_data: Unlabeled data DataFrame with final_prob column as confidence confidence_threshold: Not used, kept for compatibility Returns: selected: Low confidence samples with pseudo labels and sequence information """ # Check if final_prob column exists if 'final_prob' not in unlabeled_data.columns: return pd.DataFrame() try: # Predict probabilities probs = model.predict_proba(X_unlabeled) # For binary classification, get positive class probability if probs.shape[1] == 2: pos_probs = probs[:, 1] else: pos_probs = probs.flatten() # Calculate entropy epsilon = 1e-15 probs_safe = np.clip(probs, epsilon, 1 - epsilon) entropy = -np.sum(probs_safe * np.log(probs_safe), axis=1) # Get predicted labels preds = model.predict(X_unlabeled) # Create result DataFrame result_df = pd.DataFrame({ 'entropy': entropy, 'pseudo_label': preds, 'prob': pos_probs }, index=unlabeled_data.index) # Get confidence from original data final_probs = unlabeled_data['final_prob'].values # Select samples: entropy < confidence & prediction_prob > 0.5 selected_mask = (result_df['entropy'] < final_probs) & (result_df['prob'] > 0.5) # Select qualifying samples selected = unlabeled_data.loc[result_df[selected_mask].index].copy() # Add pseudo labels if not selected.empty: selected['label'] = result_df.loc[result_df[selected_mask].index, 'pseudo_label'].values return selected except Exception as e: return pd.DataFrame() def evaluate_model_gb(model, X, y): """ Evaluate model performance for GB model Args: model: Trained model X: Feature matrix y: Labels Returns: metrics: Performance metrics dictionary """ default_metrics = { 'accuracy': 0.0, 'auc': 0.0, 'f1': 0.0, 'precision': 0.0, 'recall': 0.0, 'loss': float('inf') } try: # Get predictions y_pred = model.predict(X) y_pred_proba = model.predict_proba(X) # Get positive class probabilities if y_pred_proba.shape[1] == 2: y_pred_prob = y_pred_proba[:, 1] else: y_pred_prob = y_pred_proba.flatten() metrics = default_metrics.copy() # Calculate metrics try: metrics['accuracy'] = accuracy_score(y, y_pred) except Exception: pass try: if len(np.unique(y_pred_prob)) > 1: metrics['auc'] = roc_auc_score(y, y_pred_prob) else: metrics['auc'] = 0.5 except Exception: pass try: metrics['f1'] = f1_score(y, y_pred, zero_division=0) except Exception: pass try: metrics['precision'] = precision_score(y, y_pred, zero_division=0) except Exception: pass try: metrics['recall'] = recall_score(y, y_pred, zero_division=0) except Exception: pass try: y_pred_prob_safe = np.clip(y_pred_prob, 1e-15, 1-1e-15) metrics['loss'] = log_loss(y, y_pred_prob_safe) except Exception: pass return metrics except Exception as e: return default_metrics def evaluate_model_cnn(model, X_test, y_test): """Evaluate CNN model performance""" default_metrics = { 'accuracy': 0.0, 'auc': 0.0, 'f1': 0.0, 'precision': 0.0, 'recall': 0.0, 'loss': float('inf') } try: # Batch prediction to avoid memory issues batch_size = 128 n_samples = len(X_test) n_batches = (n_samples + batch_size - 1) // batch_size y_pred = np.zeros(n_samples) for i in range(n_batches): start_idx = i * batch_size end_idx = min((i + 1) * batch_size, n_samples) batch_preds = model.predict(X_test[start_idx:end_idx], verbose=0) # Handle multi-output models if isinstance(batch_preds, list): batch_preds = batch_preds[0] # Ensure predictions are 1D if len(batch_preds.shape) > 1 and batch_preds.shape[1] > 1: batch_preds = batch_preds[:, 1] elif len(batch_preds.shape) > 1: batch_preds = batch_preds.flatten() y_pred[start_idx:end_idx] = batch_preds # Convert probabilities to binary predictions y_pred_binary = (y_pred > 0.5).astype(int) # Ensure labels are 1D if len(y_test.shape) > 1: y_test = y_test.flatten() metrics = default_metrics.copy() # Calculate metrics individually try: metrics['accuracy'] = accuracy_score(y_test, y_pred_binary) except Exception: pass try: if len(np.unique(y_pred)) > 1: metrics['auc'] = roc_auc_score(y_test, y_pred) else: # Handle case where all predictions are the same if (np.mean(y_pred) > 0.5 and np.mean(y_test) > 0.5) or \ (np.mean(y_pred) <= 0.5 and np.mean(y_test) < 0.5): metrics['auc'] = 0.55 else: metrics['auc'] = 0.45 except Exception: pass try: metrics['f1'] = f1_score(y_test, y_pred_binary, zero_division=0) except Exception: pass try: metrics['precision'] = precision_score(y_test, y_pred_binary, zero_division=0) except Exception: pass try: metrics['recall'] = recall_score(y_test, y_pred_binary, zero_division=0) except Exception: pass # Calculate loss try: y_pred_prob = np.clip(y_pred, 1e-15, 1-1e-15) metrics['loss'] = log_loss(y_test, y_pred_prob) except Exception: pass return metrics except Exception as e: return default_metrics