FScanpy-commit-code/utils/function.py

419 lines
14 KiB
Python
Raw Permalink Normal View History

2025-08-17 15:31:58 +08:00
"""
Common Functions for FScanpy Models
"""
import os
import numpy as np
import pandas as pd
import pickle
import json
from sklearn.metrics import (
accuracy_score,
recall_score,
precision_score,
roc_auc_score,
log_loss,
f1_score
)
from utils.config import BaseConfig
def select_low_confidence_samples_cnn(model, X_unlabeled, unlabeled_data, confidence_threshold=0.5):
"""
Select low confidence samples and assign pseudo labels
Args:
model: Current model
X_unlabeled: Unlabeled data features
unlabeled_data: Unlabeled data DataFrame with final_prob column as confidence
confidence_threshold: Not used, kept for compatibility
Returns:
selected: Selected samples with pseudo labels
"""
# Check if final_prob column exists
if 'final_prob' not in unlabeled_data.columns:
return pd.DataFrame()
# Predict probabilities for unlabeled data
probs = model.predict(X_unlabeled)
# For binary classification, Keras outputs positive class probability
# Build complete probability distribution [1-p, p]
probs_full = np.column_stack([1-probs, probs])
# Calculate entropy (prediction uncertainty)
epsilon = 1e-15
probs_full_safe = np.clip(probs_full, epsilon, 1 - epsilon)
entropy = -np.sum(probs_full_safe * np.log(probs_full_safe), axis=1)
# Get predicted labels
preds = (probs > 0.5).astype(int).flatten()
# Create result DataFrame
result_df = pd.DataFrame({
'entropy': entropy,
'pseudo_label': preds,
'prob': probs.flatten()
}, index=unlabeled_data.index)
# Get final_prob as confidence from original data
final_probs = unlabeled_data['final_prob'].values
# Select samples based on: entropy < confidence(final_prob) & prediction_prob > 0.5
selected_mask = (result_df['entropy'] < final_probs) & (result_df['prob'] > 0.5)
# Select qualifying samples
selected = unlabeled_data.loc[result_df[selected_mask].index].copy()
# Add pseudo labels to selected samples
if not selected.empty:
selected['label'] = result_df.loc[result_df[selected_mask].index, 'pseudo_label'].values
return selected
def convert_numpy_types(obj):
"""
Recursively convert NumPy data types to Python native types for JSON serialization
Args:
obj: Any object that may contain NumPy data types
Returns:
Converted object with all NumPy types converted to Python native types
"""
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.bool_):
return bool(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, dict):
return {key: convert_numpy_types(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [convert_numpy_types(item) for item in obj]
elif isinstance(obj, tuple):
return tuple(convert_numpy_types(item) for item in obj)
else:
return obj
def save_training_info(model, training_info, save_dir, model_type="best", is_final_model=False):
"""
Save model and training information
Args:
model: Trained model
training_info: Training information dictionary
save_dir: Save directory
model_type: Model type, "best" for best model, "final" for final model
is_final_model: Whether this is the final model from self-training
"""
# Create save directory
os.makedirs(save_dir, exist_ok=True)
try:
# Save model
model_filename = f"{model_type}_model.h5"
model_path = os.path.join(save_dir, model_filename)
model.save(model_path)
# Prepare training info for JSON serialization
serializable_info = convert_numpy_types(training_info)
# Save training info
info_filename = f"{model_type}_training_info.json"
info_path = os.path.join(save_dir, info_filename)
with open(info_path, 'w') as f:
json.dump(serializable_info, f, indent=2)
# Save model weights separately
weights_filename = f"{model_type}_weights.pkl"
weights_path = os.path.join(save_dir, weights_filename)
with open(weights_path, 'wb') as f:
pickle.dump(model.get_weights(), f)
except Exception as e:
pass
def load_data(neg_samples=20000):
"""
Load data
Args:
neg_samples: Number of randomly selected EUPLOTES negative samples,
if None use all negative samples
Returns:
train_data: Training data
test_data: Test data
low_conf_data: Low confidence data
xu_data: Xu dataset as additional validation set
atkins_data: Atkins dataset as additional validation set
"""
try:
# Load merged data files
train_data = pd.read_csv(BaseConfig.TRAIN_DATA)
test_data = pd.read_csv(BaseConfig.TEST_DATA)
validation_data = pd.read_csv(BaseConfig.VALIDATION_DATA)
# Ensure required columns exist
required_columns = ['full_seq', 'label', 'source']
for df in [train_data, test_data, validation_data]:
for col in required_columns:
if col not in df.columns:
if col == 'label':
df[col] = 0
elif col == 'source':
df[col] = 'unknown'
else:
df[col] = ''
# Separate validation datasets
xu_data = validation_data[validation_data['source'] == 'Xu'].copy()
atkins_data = validation_data[validation_data['source'] == 'Atkins'].copy()
# Create low confidence data (placeholder)
low_conf_data = pd.DataFrame()
# Sample negative samples if specified
if neg_samples is not None:
# Sample from EUPLOTES negative samples in training data
euplotes_neg = train_data[
(train_data['source'] == 'EUPLOTES') &
(train_data['label'] == 0)
]
if len(euplotes_neg) > neg_samples:
sampled_neg = euplotes_neg.sample(n=neg_samples, random_state=42)
# Keep positive samples and other sources, replace EUPLOTES negatives
train_data = pd.concat([
train_data[~((train_data['source'] == 'EUPLOTES') & (train_data['label'] == 0))],
sampled_neg
], ignore_index=True)
return train_data, test_data, low_conf_data, xu_data, atkins_data
except Exception as e:
# Return empty DataFrames on error
empty_df = pd.DataFrame()
return empty_df, empty_df, empty_df, empty_df, empty_df
def select_low_confidence_samples_gb(model, X_unlabeled, unlabeled_data, confidence_threshold=0.5):
"""
Select low confidence samples and assign pseudo labels for GB model
Args:
model: Current model
X_unlabeled: Unlabeled data features
unlabeled_data: Unlabeled data DataFrame with final_prob column as confidence
confidence_threshold: Not used, kept for compatibility
Returns:
selected: Low confidence samples with pseudo labels and sequence information
"""
# Check if final_prob column exists
if 'final_prob' not in unlabeled_data.columns:
return pd.DataFrame()
try:
# Predict probabilities
probs = model.predict_proba(X_unlabeled)
# For binary classification, get positive class probability
if probs.shape[1] == 2:
pos_probs = probs[:, 1]
else:
pos_probs = probs.flatten()
# Calculate entropy
epsilon = 1e-15
probs_safe = np.clip(probs, epsilon, 1 - epsilon)
entropy = -np.sum(probs_safe * np.log(probs_safe), axis=1)
# Get predicted labels
preds = model.predict(X_unlabeled)
# Create result DataFrame
result_df = pd.DataFrame({
'entropy': entropy,
'pseudo_label': preds,
'prob': pos_probs
}, index=unlabeled_data.index)
# Get confidence from original data
final_probs = unlabeled_data['final_prob'].values
# Select samples: entropy < confidence & prediction_prob > 0.5
selected_mask = (result_df['entropy'] < final_probs) & (result_df['prob'] > 0.5)
# Select qualifying samples
selected = unlabeled_data.loc[result_df[selected_mask].index].copy()
# Add pseudo labels
if not selected.empty:
selected['label'] = result_df.loc[result_df[selected_mask].index, 'pseudo_label'].values
return selected
except Exception as e:
return pd.DataFrame()
def evaluate_model_gb(model, X, y):
"""
Evaluate model performance for GB model
Args:
model: Trained model
X: Feature matrix
y: Labels
Returns:
metrics: Performance metrics dictionary
"""
default_metrics = {
'accuracy': 0.0, 'auc': 0.0, 'f1': 0.0,
'precision': 0.0, 'recall': 0.0, 'loss': float('inf')
}
try:
# Get predictions
y_pred = model.predict(X)
y_pred_proba = model.predict_proba(X)
# Get positive class probabilities
if y_pred_proba.shape[1] == 2:
y_pred_prob = y_pred_proba[:, 1]
else:
y_pred_prob = y_pred_proba.flatten()
metrics = default_metrics.copy()
# Calculate metrics
try:
metrics['accuracy'] = accuracy_score(y, y_pred)
except Exception:
pass
try:
if len(np.unique(y_pred_prob)) > 1:
metrics['auc'] = roc_auc_score(y, y_pred_prob)
else:
metrics['auc'] = 0.5
except Exception:
pass
try:
metrics['f1'] = f1_score(y, y_pred, zero_division=0)
except Exception:
pass
try:
metrics['precision'] = precision_score(y, y_pred, zero_division=0)
except Exception:
pass
try:
metrics['recall'] = recall_score(y, y_pred, zero_division=0)
except Exception:
pass
try:
y_pred_prob_safe = np.clip(y_pred_prob, 1e-15, 1-1e-15)
metrics['loss'] = log_loss(y, y_pred_prob_safe)
except Exception:
pass
return metrics
except Exception as e:
return default_metrics
def evaluate_model_cnn(model, X_test, y_test):
"""Evaluate CNN model performance"""
default_metrics = {
'accuracy': 0.0, 'auc': 0.0, 'f1': 0.0,
'precision': 0.0, 'recall': 0.0, 'loss': float('inf')
}
try:
# Batch prediction to avoid memory issues
batch_size = 128
n_samples = len(X_test)
n_batches = (n_samples + batch_size - 1) // batch_size
y_pred = np.zeros(n_samples)
for i in range(n_batches):
start_idx = i * batch_size
end_idx = min((i + 1) * batch_size, n_samples)
batch_preds = model.predict(X_test[start_idx:end_idx], verbose=0)
# Handle multi-output models
if isinstance(batch_preds, list):
batch_preds = batch_preds[0]
# Ensure predictions are 1D
if len(batch_preds.shape) > 1 and batch_preds.shape[1] > 1:
batch_preds = batch_preds[:, 1]
elif len(batch_preds.shape) > 1:
batch_preds = batch_preds.flatten()
y_pred[start_idx:end_idx] = batch_preds
# Convert probabilities to binary predictions
y_pred_binary = (y_pred > 0.5).astype(int)
# Ensure labels are 1D
if len(y_test.shape) > 1:
y_test = y_test.flatten()
metrics = default_metrics.copy()
# Calculate metrics individually
try:
metrics['accuracy'] = accuracy_score(y_test, y_pred_binary)
except Exception:
pass
try:
if len(np.unique(y_pred)) > 1:
metrics['auc'] = roc_auc_score(y_test, y_pred)
else:
# Handle case where all predictions are the same
if (np.mean(y_pred) > 0.5 and np.mean(y_test) > 0.5) or \
(np.mean(y_pred) <= 0.5 and np.mean(y_test) < 0.5):
metrics['auc'] = 0.55
else:
metrics['auc'] = 0.45
except Exception:
pass
try:
metrics['f1'] = f1_score(y_test, y_pred_binary, zero_division=0)
except Exception:
pass
try:
metrics['precision'] = precision_score(y_test, y_pred_binary, zero_division=0)
except Exception:
pass
try:
metrics['recall'] = recall_score(y_test, y_pred_binary, zero_division=0)
except Exception:
pass
# Calculate loss
try:
y_pred_prob = np.clip(y_pred, 1e-15, 1-1e-15)
metrics['loss'] = log_loss(y_test, y_pred_prob)
except Exception:
pass
return metrics
except Exception as e:
return default_metrics