告别手动分段!用Python实现Fisher最优分割法,轻松搞定时间序列数据聚类

张开发
2026/4/19 16:20:03 15 分钟阅读

分享文章

告别手动分段!用Python实现Fisher最优分割法,轻松搞定时间序列数据聚类
用Python实现Fisher最优分割法时间序列聚类的实战指南在金融量化交易中我们常常需要分析股票价格的波动模式在工业物联网场景下传感器采集的温度、振动数据也需要分段处理甚至用户行为日志分析也离不开对时间序列的智能切分。传统滑动窗口方法需要人工设定固定长度而突变点检测又过于关注局部异常。有没有一种方法能自动找到最佳分段点同时保持时间顺序的完整性Fisher最优分割法正是为解决这类问题而生。它不像K-means那样打乱数据顺序而是在保持时序的前提下找到使段内差异最小、段间差异最大的分割方案。想象一下你有一整年的股票日线数据Fisher法能自动识别出上涨期、震荡期、下跌期的自然分界点而无需预先设定分段数量。1. 理解Fisher最优分割法的核心思想1.1 什么是有序样本聚类与常规聚类不同有序样本聚类有两个关键约束顺序不可破坏时间戳t100的数据不能与t50的数据分到同一段分段连续性每个分段必须包含连续的时间点这种特性使得它在处理时间序列时格外有用。举个例子分析用户APP使用时长序列时我们可能希望把高频使用期和低频休眠期自动分开同时保持时间先后关系。1.2 衡量分段质量的指标Fisher法使用离差平方和(SSE)作为段内差异的度量def calculate_sse(segment): mean sum(segment) / len(segment) return sum((x - mean)**2 for x in segment)这个值越小说明该段数据越紧凑。最优分割的目标就是找到使总SSE最小的分割方案。1.3 与常见方法的对比方法保持顺序自动确定分段数复杂度适用场景滑动窗口是否O(n)固定周期模式突变点检测是是O(n^2)异常事件识别K-means否否O(kn)无时序关系的聚类Fisher最优分割是是O(kn^2)时序数据的自然分段从表格可以看出Fisher法在保持顺序和自动确定分段数方面具有独特优势特别适合需要保持时间连贯性的分析场景。2. 算法实现的关键步骤2.1 动态规划递推公式Fisher法的核心是一个动态规划问题。定义L[n,k]为前n个点分为k类的最小总SSE则递推关系为L[n,k] min(L[j-1,k-1] D(j,n)) 对于 jk,...,n其中D(j,n)表示从点j到n的SSE。这个公式的意思是要找到前n个点的最优k分割需要在所有可能的分割点j处将问题分解为前j-1个点的k-1分割加上当前段的SSE。2.2 Python实现详解以下是核心函数的实现import numpy as np def compute_segment_variance(X, start, end): 计算子序列X[start:end]的离差平方和 segment X[start:end] mean np.mean(segment) return np.sum((segment - mean)**2) def optimal_partition(X, max_k10): n len(X) # 初始化DP表 L np.zeros((n1, max_k1)) # 边界条件k1时就是整个序列的SSE for i in range(1, n1): L[i,1] compute_segment_variance(X, 0, i) # 动态规划填表 for k in range(2, max_k1): for i in range(k, n1): # 寻找最优分割点j min_cost float(inf) for j in range(k, i1): cost L[j-1,k-1] compute_segment_variance(X, j-1, i) if cost min_cost: min_cost cost L[i,k] min_cost return L提示实际应用中可以使用记忆化存储来优化重复计算特别是当序列较长时。2.3 自动确定最佳分段数通过观察不同k值对应的总SSE变化可以找到拐点作为最佳分段数def find_optimal_k(L): 根据SSE变化曲线确定最佳k值 n, max_k L.shape sse_values [L[-1,k] for k in range(1, max_k)] ratios [sse_values[i]/sse_values[i1] for i in range(len(sse_values)-1)] optimal_k np.argmax(ratios) 2 # 加2是因为从k2开始比较 return optimal_k这种方法类似于肘部法则当增加k带来的SSE下降不再显著时就找到了合理的分段数。3. 实战案例股票价格分段分析3.1 数据准备与预处理假设我们有某股票一年的日收盘价数据import yfinance as yf import matplotlib.pyplot as plt # 获取苹果公司股票数据 data yf.download(AAPL, start2022-01-01, end2022-12-31) prices data[Close].values预处理步骤归一化将价格转换为收益率或标准化值平滑处理使用移动平均消除短期波动处理缺失值线性插值或向前填充3.2 应用Fisher分割# 计算对数收益率 returns np.log(prices[1:]/prices[:-1]) # 寻找最优分割 L optimal_partition(returns) optimal_k find_optimal_k(L) break_points find_break_points(L, optimal_k) # 回溯找到具体分割点 # 可视化结果 plt.figure(figsize(12,6)) plt.plot(prices, labelPrice) for bp in break_points: plt.axvline(xbp, colorr, linestyle--) plt.title(fAAPL Price with {optimal_k} Segments) plt.legend() plt.show()3.3 结果分析与应用分段结果可以用于趋势识别标记上涨/下跌/震荡阶段事件关联检查分段点附近的公司公告或宏观经济事件策略回测在不同阶段应用不同的交易策略例如我们可能发现2022-01至2022-03美联储加息预期下的下跌段2022-04至2022-08财报超预期带来的反弹段2022-09至2022-12宏观经济担忧导致的震荡段4. 高级技巧与优化建议4.1 处理多维时间序列对于多变量情况如同时考虑价格和成交量只需修改SSE计算方式def multivariate_sse(X, start, end): segment X[start:end] mean np.mean(segment, axis0) return np.sum((segment - mean)**2)4.2 加速计算的方法提前终止当SSE下降小于阈值时停止增加k采样优化对长序列先降采样处理并行计算不同k值的计算相互独立from joblib import Parallel, delayed def parallel_optimal_partition(X, max_k10): def compute_for_k(k): # 计算特定k值的DP表列 pass results Parallel(n_jobs-1)(delayed(compute_for_k)(k) for k in range(2,max_k1)) return np.column_stack(results)4.3 常见问题排查分段不合理检查数据是否平稳必要时进行差分处理计算时间过长尝试减少max_k或缩短序列长度所有点分到一段可能是数据方差太小尝试放大特征差异注意实际应用中建议先在小样本上测试参数再应用到全量数据。

更多文章