FScanpy-package/FScanpy/features/sequence.py

283 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import pandas as pd
import itertools
from typing import List, Dict, Union
class SequenceFeatureExtractor:
"""DNA序列特征提取器"""
def __init__(self, seq_length=33):
"""初始化特征提取器"""
self.bases = ['A', 'T', 'G', 'C']
self.valid_bases = set('ATGCN')
self.seq_length = seq_length # 添加序列长度配置
self.feature_names = self._get_feature_names()
def _get_feature_names(self) -> List[str]:
"""
返回特征名称列表,包含所有可能的碱基特征
Returns:
features: 特征名称列表
"""
features = []
# 基础特征 (包含N)
bases = ['A', 'T', 'G', 'C', 'N']
features.extend(bases)
# 3-mer特征
kmers_3 = [''.join(p) for p in itertools.product(bases, repeat=3)] # 125个特征
features.extend(kmers_3)
# 密码子特征
codons = [''.join(p) for p in itertools.product(['A', 'T', 'G', 'C'], repeat=3)] # 64个密码子
n_codons = self.seq_length // 3 # 计算序列中包含的完整密码子数量
for i in range(n_codons):
for codon in codons:
features.append(f'codon_pos_{i}_{codon}')
# GC含量特征
features.append('gc_content')
# 序列复杂度特征
features.append('sequence_complexity')
return features
def trim_sequence(self, seq, target_length):
"""
从序列两端等量截取,使其达到目标长度
Args:
seq: 原始序列
target_length: 目标长度
Returns:
截取后的序列
"""
if len(seq) <= target_length:
return seq
# 计算需要从每端截取的长度
excess = len(seq) - target_length
trim_each_side = excess // 2
# 从两端等量截取,保持中心位置不变
return seq[trim_each_side:len(seq)-trim_each_side]
def _preprocess_sequence(self, sequence):
"""
将DNA序列转换为特征向量
Args:
sequence: DNA序列
Returns:
feature_vector: 特征向量
"""
try:
feature_names = self.feature_names
if pd.isna(sequence) or not isinstance(sequence, str):
sequence = str(sequence)
sequence = sequence.upper().replace('U', 'T') # 统一为大写字母
# 如果序列长度不等于目标长度,进行截取或填充
if len(sequence) > self.seq_length:
sequence = self.trim_sequence(sequence, self.seq_length)
else:
sequence = sequence[:self.seq_length].ljust(self.seq_length, 'N')
# 初始化特征字典
features = {
'A': 0,
'T': 0,
'G': 0,
'C': 0,
'N': 0
}
kmer_features = {}
# 碱基组成
for base in ['A', 'T', 'G', 'C', 'N']:
features[base] = sequence.count(base) / self.seq_length
# 3-mer特征
for kmer in [''.join(p) for p in itertools.product(['A', 'T', 'G', 'C', 'N'], repeat=3)]:
kmer_count = 0
for i in range(self.seq_length - 2):
if sequence[i:i+3] == kmer:
kmer_count += 1
kmer_features[kmer] = kmer_count / max(1, self.seq_length - 2)
# 密码子特征
codon_features = {}
codons = [''.join(p) for p in itertools.product(['A', 'T', 'G', 'C'], repeat=3)] # 64个密码子
n_codons = self.seq_length // 3 # 计算序列中包含的完整密码子数量
for i in range(n_codons):
pos_start = i * 3
current_codon = sequence[pos_start:pos_start+3]
for codon in codons:
codon_features[f'codon_pos_{i}_{codon}'] = 1 if current_codon == codon and 'N' not in current_codon else 0
# GC含量
valid_bases = [b for b in sequence if b != 'N']
gc_content = (valid_bases.count('G') + valid_bases.count('C')) / len(valid_bases) if valid_bases else 0
# 序列复杂度Shannon熵
from collections import Counter
valid_counts = Counter(valid_bases)
total_valid = sum(valid_counts.values())
entropy = 0
for cnt in valid_counts.values():
p = cnt / total_valid
entropy += -p * np.log2(p)
entropy /= np.log2(4) # 归一化到0-1
# 合并所有特征
all_features = {**features, **kmer_features, **codon_features}
all_features['gc_content'] = gc_content
all_features['sequence_complexity'] = entropy
# 确保特征顺序一致
feature_vector = [all_features.get(f, 0.0) for f in feature_names]
return feature_vector
except Exception as e:
raise ValueError(f"特征提取失败: {str(e)}")
def extract_features_batch(self, sequences: List[Union[str, float]]) -> np.ndarray:
"""
批量提取特征
Args:
sequences: DNA序列列表
Returns:
np.ndarray: 特征矩阵
"""
try:
return np.array([self.extract_features(seq) for seq in sequences])
except Exception as e:
raise ValueError(f"批量特征提取失败: {str(e)}")
def predict_region_batch(self, data: pd.DataFrame, gb_threshold: float = 0.1) -> pd.DataFrame:
"""
批量预测区域序列
Args:
data: DataFrame包含'33bp''399bp'
gb_threshold: GB模型概率阈值默认为0.1
Returns:
DataFrame: 包含预测结果的DataFrame
"""
results = []
for idx, row in data.iterrows():
try:
# 确保序列是字符串
seq_33bp = str(row['33bp'])
seq = str(row['399bp'])
# 确保序列长度正确
seq_33bp = self._preprocess_sequence(seq_33bp)
seq = self._preprocess_sequence(seq)
# 预测
result = self.predict_region(seq_33bp, seq, gb_threshold)
# 添加原始数据的其他列
for col in data.columns:
if col not in ['33bp', '399bp']:
result[col] = row[col]
results.append(result)
except Exception as e:
print(f"处理索引 {idx} 的序列时出错: {str(e)}")
continue
return pd.DataFrame(results)
def extract_features(self, sequence: str) -> list:
"""
提取序列特征
Args:
sequence: DNA序列
Returns:
list: 特征向量
"""
try:
# 确保输入是字符串
if not isinstance(sequence, str):
sequence = str(sequence)
# 大写并替换U为T
sequence = sequence.upper().replace('U', 'T')
# 如果序列长度不等于目标长度,进行截取
if len(sequence) != self.seq_length:
sequence = self.trim_sequence(sequence, self.seq_length)
# 初始化特征列表
features = []
try:
# 基础特征 (碱基频率)
for base in ['A', 'T', 'G', 'C', 'N']:
features.append(sequence.count(base) / len(sequence))
# 3-mer特征
for kmer in [''.join(p) for p in itertools.product(['A', 'T', 'G', 'C', 'N'], repeat=3)]:
count = 0
for i in range(len(sequence) - 2):
if sequence[i:i+3] == kmer:
count += 1
features.append(count / max(1, len(sequence) - 2))
# 密码子特征
codons = [''.join(p) for p in itertools.product(['A', 'T', 'G', 'C'], repeat=3)]
n_codons = len(sequence) // 3
for i in range(n_codons):
pos_start = i * 3
current_codon = sequence[pos_start:pos_start+3]
for codon in codons:
features.append(1 if current_codon == codon and 'N' not in current_codon else 0)
# GC含量
valid_bases = [b for b in sequence if b != 'N']
gc_content = (valid_bases.count('G') + valid_bases.count('C')) / len(valid_bases) if valid_bases else 0
features.append(gc_content)
# 序列复杂度
from collections import Counter
valid_counts = Counter(valid_bases)
total_valid = sum(valid_counts.values())
entropy = 0
if total_valid > 0: # 避免除零错误
for cnt in valid_counts.values():
if cnt > 0: # 避免log(0)
p = cnt / total_valid
entropy += -p * np.log2(p)
entropy /= np.log2(4) if len(valid_counts) > 0 else 1 # 归一化到0-1避免除零
features.append(entropy)
# 确保返回的是一维列表或数组
if isinstance(features, np.ndarray) and features.ndim > 1:
features = features.flatten()
return features
except Exception as e:
print(f"特征计算过程出错: {str(e)}")
# 如果计算过程出错,返回正确长度的全零特征向量
expected_length = 5 + 125 + (len(sequence) // 3) * 64 + 2 # 根据特征提取逻辑计算特征向量长度
return [0.0] * expected_length
except Exception as e:
print(f"特征提取失败: {str(e)}")
# 返回一个空列表,调用方需处理这种情况
return []