283 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			283 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
| import numpy as np
 | ||
| import pandas as pd
 | ||
| import itertools
 | ||
| from typing import List, Dict, Union
 | ||
| 
 | ||
| class SequenceFeatureExtractor:
 | ||
|     """DNA序列特征提取器"""
 | ||
|     
 | ||
|     def __init__(self, seq_length=33):
 | ||
|         """初始化特征提取器"""
 | ||
|         self.bases = ['A', 'T', 'G', 'C']
 | ||
|         self.valid_bases = set('ATGCN')
 | ||
|         self.seq_length = seq_length  # 添加序列长度配置
 | ||
|         self.feature_names = self._get_feature_names()
 | ||
|     
 | ||
|     def _get_feature_names(self) -> List[str]:
 | ||
|         """
 | ||
|         返回特征名称列表,包含所有可能的碱基特征
 | ||
|         
 | ||
|         Returns:
 | ||
|             features: 特征名称列表
 | ||
|         """
 | ||
|         features = []
 | ||
| 
 | ||
|         # 基础特征 (包含N)
 | ||
|         bases = ['A', 'T', 'G', 'C', 'N']
 | ||
|         features.extend(bases)
 | ||
| 
 | ||
|         # 3-mer特征
 | ||
|         kmers_3 = [''.join(p) for p in itertools.product(bases, repeat=3)]  # 125个特征
 | ||
|         features.extend(kmers_3)
 | ||
| 
 | ||
|         # 密码子特征
 | ||
|         codons = [''.join(p) for p in itertools.product(['A', 'T', 'G', 'C'], repeat=3)]  # 64个密码子
 | ||
|         n_codons = self.seq_length // 3  # 计算序列中包含的完整密码子数量
 | ||
|         for i in range(n_codons):
 | ||
|             for codon in codons:
 | ||
|                 features.append(f'codon_pos_{i}_{codon}')
 | ||
| 
 | ||
|         # GC含量特征
 | ||
|         features.append('gc_content')
 | ||
| 
 | ||
|         # 序列复杂度特征
 | ||
|         features.append('sequence_complexity')
 | ||
| 
 | ||
|         return features
 | ||
| 
 | ||
|     def trim_sequence(self, seq, target_length):
 | ||
|         """
 | ||
|         从序列两端等量截取,使其达到目标长度
 | ||
|         
 | ||
|         Args:
 | ||
|             seq: 原始序列
 | ||
|             target_length: 目标长度
 | ||
|             
 | ||
|         Returns:
 | ||
|             截取后的序列
 | ||
|         """
 | ||
|         if len(seq) <= target_length:
 | ||
|             return seq
 | ||
|             
 | ||
|         # 计算需要从每端截取的长度
 | ||
|         excess = len(seq) - target_length
 | ||
|         trim_each_side = excess // 2
 | ||
|         
 | ||
|         # 从两端等量截取,保持中心位置不变
 | ||
|         return seq[trim_each_side:len(seq)-trim_each_side]
 | ||
|     
 | ||
|     def _preprocess_sequence(self, sequence):
 | ||
|         """
 | ||
|         将DNA序列转换为特征向量
 | ||
|         
 | ||
|         Args:
 | ||
|             sequence: DNA序列
 | ||
|             
 | ||
|         Returns:
 | ||
|             feature_vector: 特征向量
 | ||
|         """
 | ||
|         try:
 | ||
|             feature_names = self.feature_names
 | ||
|             
 | ||
|             if pd.isna(sequence) or not isinstance(sequence, str):
 | ||
|                 sequence = str(sequence)
 | ||
|             sequence = sequence.upper().replace('U', 'T')  # 统一为大写字母
 | ||
| 
 | ||
|             # 如果序列长度不等于目标长度,进行截取或填充
 | ||
|             if len(sequence) > self.seq_length:
 | ||
|                 sequence = self.trim_sequence(sequence, self.seq_length)
 | ||
|             else:
 | ||
|                 sequence = sequence[:self.seq_length].ljust(self.seq_length, 'N')
 | ||
| 
 | ||
|             # 初始化特征字典
 | ||
|             features = {
 | ||
|                 'A': 0,
 | ||
|                 'T': 0,
 | ||
|                 'G': 0,
 | ||
|                 'C': 0,
 | ||
|                 'N': 0
 | ||
|             }
 | ||
|             kmer_features = {}
 | ||
| 
 | ||
|             # 碱基组成
 | ||
|             for base in ['A', 'T', 'G', 'C', 'N']:
 | ||
|                 features[base] = sequence.count(base) / self.seq_length
 | ||
| 
 | ||
|             # 3-mer特征
 | ||
|             for kmer in [''.join(p) for p in itertools.product(['A', 'T', 'G', 'C', 'N'], repeat=3)]:
 | ||
|                 kmer_count = 0
 | ||
|                 for i in range(self.seq_length - 2):
 | ||
|                     if sequence[i:i+3] == kmer:
 | ||
|                         kmer_count += 1
 | ||
|                 kmer_features[kmer] = kmer_count / max(1, self.seq_length - 2)
 | ||
| 
 | ||
|             # 密码子特征
 | ||
|             codon_features = {}
 | ||
|             codons = [''.join(p) for p in itertools.product(['A', 'T', 'G', 'C'], repeat=3)]  # 64个密码子
 | ||
|             n_codons = self.seq_length // 3  # 计算序列中包含的完整密码子数量
 | ||
|             for i in range(n_codons):
 | ||
|                 pos_start = i * 3
 | ||
|                 current_codon = sequence[pos_start:pos_start+3]
 | ||
|                 for codon in codons:
 | ||
|                     codon_features[f'codon_pos_{i}_{codon}'] = 1 if current_codon == codon and 'N' not in current_codon else 0
 | ||
| 
 | ||
|             # GC含量
 | ||
|             valid_bases = [b for b in sequence if b != 'N']
 | ||
|             gc_content = (valid_bases.count('G') + valid_bases.count('C')) / len(valid_bases) if valid_bases else 0
 | ||
| 
 | ||
|             # 序列复杂度(Shannon熵)
 | ||
|             from collections import Counter
 | ||
|             valid_counts = Counter(valid_bases)
 | ||
|             total_valid = sum(valid_counts.values())
 | ||
|             entropy = 0
 | ||
|             for cnt in valid_counts.values():
 | ||
|                 p = cnt / total_valid
 | ||
|                 entropy += -p * np.log2(p)
 | ||
|             entropy /= np.log2(4)  # 归一化到0-1
 | ||
| 
 | ||
|             # 合并所有特征
 | ||
|             all_features = {**features, **kmer_features, **codon_features}
 | ||
|             all_features['gc_content'] = gc_content
 | ||
|             all_features['sequence_complexity'] = entropy
 | ||
| 
 | ||
|             # 确保特征顺序一致
 | ||
|             feature_vector = [all_features.get(f, 0.0) for f in feature_names]
 | ||
| 
 | ||
|             return feature_vector
 | ||
|         except Exception as e:
 | ||
|             raise ValueError(f"特征提取失败: {str(e)}")
 | ||
|     
 | ||
|     def extract_features_batch(self, sequences: List[Union[str, float]]) -> np.ndarray:
 | ||
|         """
 | ||
|         批量提取特征
 | ||
|         
 | ||
|         Args:
 | ||
|             sequences: DNA序列列表
 | ||
|             
 | ||
|         Returns:
 | ||
|             np.ndarray: 特征矩阵
 | ||
|         """
 | ||
|         try:
 | ||
|             return np.array([self.extract_features(seq) for seq in sequences])
 | ||
|         except Exception as e:
 | ||
|             raise ValueError(f"批量特征提取失败: {str(e)}")
 | ||
|     
 | ||
|     def predict_region_batch(self, data: pd.DataFrame, gb_threshold: float = 0.1) -> pd.DataFrame:
 | ||
|         """
 | ||
|         批量预测区域序列
 | ||
|         
 | ||
|         Args:
 | ||
|             data: DataFrame包含'33bp'和'399bp'列
 | ||
|             gb_threshold: GB模型概率阈值(默认为0.1)
 | ||
|             
 | ||
|         Returns:
 | ||
|             DataFrame: 包含预测结果的DataFrame
 | ||
|         """
 | ||
|         results = []
 | ||
|         for idx, row in data.iterrows():
 | ||
|             try:
 | ||
|                 # 确保序列是字符串
 | ||
|                 seq_33bp = str(row['33bp'])
 | ||
|                 seq = str(row['399bp'])
 | ||
|                 
 | ||
|                 # 确保序列长度正确
 | ||
|                 seq_33bp = self._preprocess_sequence(seq_33bp)
 | ||
|                 seq = self._preprocess_sequence(seq)
 | ||
|                 
 | ||
|                 # 预测
 | ||
|                 result = self.predict_region(seq_33bp, seq, gb_threshold)
 | ||
|                 
 | ||
|                 # 添加原始数据的其他列
 | ||
|                 for col in data.columns:
 | ||
|                     if col not in ['33bp', '399bp']:
 | ||
|                         result[col] = row[col]
 | ||
|                         
 | ||
|                 results.append(result)
 | ||
|                 
 | ||
|             except Exception as e:
 | ||
|                 print(f"处理索引 {idx} 的序列时出错: {str(e)}")
 | ||
|                 continue
 | ||
|                 
 | ||
|         return pd.DataFrame(results)
 | ||
| 
 | ||
|     def extract_features(self, sequence: str) -> list:
 | ||
|         """
 | ||
|         提取序列特征
 | ||
|         
 | ||
|         Args:
 | ||
|             sequence: DNA序列
 | ||
|             
 | ||
|         Returns:
 | ||
|             list: 特征向量
 | ||
|         """
 | ||
|         try:
 | ||
|             # 确保输入是字符串
 | ||
|             if not isinstance(sequence, str):
 | ||
|                 sequence = str(sequence)
 | ||
|             
 | ||
|             # 大写并替换U为T
 | ||
|             sequence = sequence.upper().replace('U', 'T')
 | ||
|             
 | ||
|             # 如果序列长度不等于目标长度,进行截取
 | ||
|             if len(sequence) != self.seq_length:
 | ||
|                 sequence = self.trim_sequence(sequence, self.seq_length)
 | ||
|             
 | ||
|             # 初始化特征列表
 | ||
|             features = []
 | ||
|             
 | ||
|             try:
 | ||
|                 # 基础特征 (碱基频率)
 | ||
|                 for base in ['A', 'T', 'G', 'C', 'N']:
 | ||
|                     features.append(sequence.count(base) / len(sequence))
 | ||
|                 
 | ||
|                 # 3-mer特征
 | ||
|                 for kmer in [''.join(p) for p in itertools.product(['A', 'T', 'G', 'C', 'N'], repeat=3)]:
 | ||
|                     count = 0
 | ||
|                     for i in range(len(sequence) - 2):
 | ||
|                         if sequence[i:i+3] == kmer:
 | ||
|                             count += 1
 | ||
|                     features.append(count / max(1, len(sequence) - 2))
 | ||
|                 
 | ||
|                 # 密码子特征
 | ||
|                 codons = [''.join(p) for p in itertools.product(['A', 'T', 'G', 'C'], repeat=3)]
 | ||
|                 n_codons = len(sequence) // 3
 | ||
|                 for i in range(n_codons):
 | ||
|                     pos_start = i * 3
 | ||
|                     current_codon = sequence[pos_start:pos_start+3]
 | ||
|                     for codon in codons:
 | ||
|                         features.append(1 if current_codon == codon and 'N' not in current_codon else 0)
 | ||
|                 
 | ||
|                 # GC含量
 | ||
|                 valid_bases = [b for b in sequence if b != 'N']
 | ||
|                 gc_content = (valid_bases.count('G') + valid_bases.count('C')) / len(valid_bases) if valid_bases else 0
 | ||
|                 features.append(gc_content)
 | ||
|                 
 | ||
|                 # 序列复杂度
 | ||
|                 from collections import Counter
 | ||
|                 valid_counts = Counter(valid_bases)
 | ||
|                 total_valid = sum(valid_counts.values())
 | ||
|                 entropy = 0
 | ||
|                 if total_valid > 0:  # 避免除零错误
 | ||
|                     for cnt in valid_counts.values():
 | ||
|                         if cnt > 0:  # 避免log(0)
 | ||
|                             p = cnt / total_valid
 | ||
|                             entropy += -p * np.log2(p)
 | ||
|                     entropy /= np.log2(4) if len(valid_counts) > 0 else 1  # 归一化到0-1,避免除零
 | ||
|                 features.append(entropy)
 | ||
|                 
 | ||
|                 # 确保返回的是一维列表或数组
 | ||
|                 if isinstance(features, np.ndarray) and features.ndim > 1:
 | ||
|                     features = features.flatten()
 | ||
|                 
 | ||
|                 return features
 | ||
|             
 | ||
|             except Exception as e:
 | ||
|                 print(f"特征计算过程出错: {str(e)}")
 | ||
|                 # 如果计算过程出错,返回正确长度的全零特征向量
 | ||
|                 expected_length = 5 + 125 + (len(sequence) // 3) * 64 + 2  # 根据特征提取逻辑计算特征向量长度
 | ||
|                 return [0.0] * expected_length
 | ||
|                 
 | ||
|         except Exception as e:
 | ||
|             print(f"特征提取失败: {str(e)}")
 | ||
|             # 返回一个空列表,调用方需处理这种情况
 | ||
|             return [] |