智算多多



第一次接触AI项目时,我天真地以为只要把数据扔进模型就能得到好结果。直到亲眼看见一个价值百万的项目因为数据问题翻车——模型训练时准确率高达95%,上线后却连50%都达不到。后来排查发现,原始数据里混入了大量测试数据,还有重复标注的样本。这个教训让我明白:数据质量决定模型上限,而预处理决定数据质量。
数据预处理就像盖房子前的地基工程。你可能会花80%的时间在数据清洗、标注和转换上,但这些工作能决定剩下20%的建模工作是否有效。举个例子,自然语言处理中常见的文本数据,原始形态可能是混杂着HTML标签、特殊符号和错别字的"脏数据"。如果不经过清洗就直接喂给模型,效果可想而知。
提示:在实际项目中,数据科学家平均花费60%以上的时间在数据预处理环节,远超过模型构建和调参的时间。
最近处理过一个电商评论情感分析案例。原始数据有三大致命伤:15%的评论是重复的,30%的评论带着"[此评论已被删除]"的标记,还有大量"好评!"、"不错"这样缺乏信息量的短文本。我们用了三周时间进行数据去重、无效内容过滤和样本增强,最终让模型准确率提升了23个百分点。这比调参带来的提升高出一个数量级。
数据清洗就像考古发掘——你得先知道土里埋着什么,才能决定用刷子还是铲子。常见的数据"脏污"包括:
我常用的Python工具包组合是pandas+missingno。下面这个代码片段能快速定位数据问题:
import pandas as pd
import missingno as msno
# 加载数据
df = pd.read_csv('raw_data.csv')
# 可视化缺失值
msno.matrix(df) # 缺失值热力图
msno.bar(df) # 各列缺失数量统计
# 统计基本问题
print(f"重复行数: {df.duplicated().sum()}")
print(f"缺失值占比:\n{df.isnull().mean().round(2)}")
面对缺失值,新手常犯的错误是直接删除。其实有更聪明的处理方法:
df.drop(columns=['无关紧要的列'], inplace=True)
df['年龄'].fillna(df['年龄'].median(), inplace=True)
df['城市'].fillna(df['城市'].mode()[0], inplace=True)
from sklearn.ensemble import RandomForestRegressor
# 构建预测模型(示例代码)
df['收入_缺失'] = df['收入'].isnull().astype(int)
上周处理过一个医疗数据集,血压字段缺失率达40%。直接删除会损失大量样本,简单填充又会引入偏差。最终我们采用KNN算法,根据年龄、性别、病史等特征预测缺失的血压值,既保留了数据规模又减少了偏差。
文本数据预处理最关键的步骤是向量化——把文字转换成模型能理解的数字。最近帮一家律所处理法律合同时,我们对比了三种方法:
from sklearn.feature_extraction.text import CountVectorizer
vectorizer = CountVectorizer(max_features=5000)
X = vectorizer.fit_transform(legal_docs)
from sklearn.feature_extraction.text import TfidfVectorizer
tfidf = TfidfVectorizer(ngram_range=(1,2))
X = tfidf.fit_transform(legal_docs)
import gensim
model = gensim.models.Word2Vec.load('law_word2vec.model')
实测发现,针对法律文本这种专业领域,用领域数据训练的Word2Vec效果最好,准确率比TF-IDF高8%。关键是要根据数据类型选择合适的方法——社交媒体短文本可能更适合TF-IDF,而长专业文档适合词向量。
计算机视觉项目的数据预处理完全是另一套玩法。上个月优化一个工业质检系统时,我们设计了这样的预处理流程:
import cv2
import albumentations as A
# 基础预处理
transform = A.Compose([
A.Resize(256, 256), # 统一尺寸
A.Normalize(), # 归一化
A.GaussNoise(p=0.2), # 数据增强
A.RandomGamma(p=0.2)
])
# 应用变换
def preprocess_image(image_path):
image = cv2.imread(image_path)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
return transform(image=image)['image']
这个流程解决了三个问题:不同相机拍摄的图片尺寸不一、光照条件差异大、缺陷样本数量不足。经过预处理后,模型召回率提升了15%,特别是对小缺陷的检测效果明显改善。
现实中的数据很少乖乖待在CSV里。上周处理政务数据时,我遇到了深度嵌套的JSON:
{
"市民投诉": [
{
"投诉编号": "TS2023001",
"详情": {
"时间": "2023-01-01",
"内容": "噪音扰民",
"处理部门": [
{"名称": "环保局", "联系人": "张科长"},
{"名称": "城管", "联系人": null}
]
}
}
]
}
用pandas的json_normalize可以优雅地展开这种嵌套结构:
from pandas import json_normalize
with open('complaints.json') as f:
data = json.load(f)
# 展开嵌套结构
df = json_normalize(
data['市民投诉'],
meta=['投诉编号'],
record_path=['详情', '处理部门'],
meta_prefix='投诉_'
)
# 处理缺失值
df['联系人'].fillna('未知', inplace=True)
金融数据预处理是另一个重灾区。去年处理股票数据时,我发现不同交易所的交易日历不一致,有的包含周末数据,有的遇到节假日就空缺。解决方案是:
# 创建完整的时间索引
full_dates = pd.date_range(start='2022-01-01', end='2022-12-31', freq='D')
# 重新索引并填充缺失值
df = df.reindex(full_dates)
df['收盘价'].fillna(method='ffill', inplace=True) # 向前填充
df['成交量'].fillna(0, inplace=True) # 成交量缺失视为0
这样处理后的数据才能用于训练时间序列预测模型。关键是要理解业务逻辑——股价在非交易日确实不会变化,但成交量确实为零。
数据预处理完不等于万事大吉。我吃过这样的亏:清洗脚本有个隐蔽的bug,把"未知"性别全转成了"男",导致模型产生严重偏差。现在我会用Great Expectations建立验证关卡:
import great_expectations as ge
# 创建验证套件
df = ge.read_csv('processed_data.csv')
# 定义验证规则
df.expect_column_values_to_not_be_null('用户ID')
df.expect_column_values_to_be_between('年龄', 18, 100)
df.expect_column_values_to_be_in_set('性别', ['男', '女', '其他'])
# 保存并执行验证
validation_result = df.validate()
这套系统能在数据出现问题时报警,比如突然出现大量缺失值,或者某个字段的取值范围异常。我把这些检查点集成到CI/CD流程中,确保进入模型的数据都是干净的。
跨表数据一致性同样重要。在用户画像项目中,我们遇到过用户基础表和交易表统计不一致的问题:
# 检查用户数一致性
base_users = user_df['用户ID'].nunique()
txn_users = txn_df['用户ID'].nunique()
assert base_users == txn_users, f"用户数不一致:基础表{base_users},交易表{txn_users}"
# 检查金额一致性
total_amount = txn_df['金额'].sum()
monthly_sum = summary_df['月销售额'].sum()
np.testing.assert_almost_equal(total_amount, monthly_sum, decimal=2)
这些检查看似简单,但能避免后续分析中的严重错误。有次发现交易总金额比月报少30万,追查发现是部分退货订单没有正确标记状态。
最近完成的电商项目展示了完整的预处理流水线:
# 关键代码片段
def preprocess_text(text):
text = re.sub(r'【.*?】', '', text) # 去除广告
text = emoji.demojize(text) # 表情符号处理
words = jieba.cut(text)
return ' '.join([w for w in words if w not in STOPWORDS])
df['清洗文本'] = df['原始评论'].progress_apply(preprocess_text)
另一个有趣的案例是液晶屏缺陷检测:
# 针对划痕缺陷的预处理
def scratch_preprocess(image):
# 对比度受限自适应直方图均衡化
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
lab[...,0] = clahe.apply(lab[...,0])
image = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
# 锐化增强边缘
kernel = np.array([[0,-1,0], [-1,5,-1], [0,-1,0]])
return cv2.filter2D(image, -1, kernel)
这种针对性预处理让模型对细微划痕的检测率从60%提升到85%。关键在于理解数据特性——普通预处理会模糊掉我们最关心的细微划痕特征。
在时间序列预测项目中,我犯过一个典型错误:在预处理时对整个数据集做标准化。这会导致未来信息泄露到训练集。正确的做法是:
from sklearn.preprocessing import StandardScaler
# 错误做法:整个数据集标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # 泄露了未来信息
# 正确做法:仅用训练集统计量
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test) # 使用训练集的均值和方差
同样的问题也存在于特征选择、缺失值填充等环节。我现在会严格遵循以下原则:
处理信用卡欺诈检测数据时,正样本占比不到0.1%。试过多种方法后,我发现组合策略最有效:
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.pipeline import Pipeline
# 定义采样策略
over = SMOTE(sampling_strategy=0.1)
under = RandomUnderSampler(sampling_strategy=0.5)
# 创建采样管道
steps = [('over', over), ('under', under)]
pipeline = Pipeline(steps=steps)
# 应用采样
X_resampled, y_resampled = pipeline.fit_resample(X, y)
但要注意,过采样不能滥用。有次对图像数据过度使用SMOTE,导致模型学会了识别人工生成的伪影特征。对于图像数据,更好的做法是使用几何变换等数据增强方法。
传统预处理代码维护成本很高。最近我开始使用PyCaret进行自动化预处理:
from pycaret.classification import *
# 初始化设置
clf = setup(
data=data,
target='类别',
session_id=123,
normalize=True, # 自动标准化
transformation=True, # 自动处理偏态
handle_unknown_categorical=True, # 自动处理新类别
unknown_categorical_method='most_frequent'
)
# 比较不同预处理后的模型效果
best_model = compare_models()
这种工具能自动尝试多种预处理组合,并评估每种组合对模型效果的影响。在快速原型阶段特别有用,可以节省大量调参时间。
当数据量达到TB级时,单机处理就不够用了。去年处理电信用户行为数据时,我们转向了PySpark:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer, StandardScaler
# 创建Spark会话
spark = SparkSession.builder.appName("TelcoPreprocessing").getOrCreate()
# 读取大数据
df = spark.read.parquet("hdfs://telco_data/*.parquet")
# 分布式处理
imputer = Imputer(inputCols=["age", "income"],
outputCols=["age_imputed", "income_imputed"])
df = imputer.fit(df).transform(df)
scaler = StandardScaler(inputCol="income_imputed",
outputCol="income_scaled")
df = scaler.fit(df).transform(df)
# 保存结果
df.write.parquet("hdfs://processed_data/")
关键优势是处理100GB数据和1TB数据的时间差异不大,都能在20分钟内完成。对于真正的大数据项目,这种可扩展性至关重要。