你好,哈布尔!
今天,我们将以简短的格式熟悉 TSFresh 库。
TSFresh 承担两项主要任务:
-
特征提取:该函数为指定的时间序列生成大量统计数据。其中,使用了计算特定特征的所谓函数。例如,自相关、熵、零交集的数量。
extract_features()
FeatureCalculators
-
特征选择:该函数执行统计测试以选择真正相关的特征,这有助于避免过度拟合并降低维度。
select_features()
主要特点是 TSFresh 允许您以所谓的长格式处理数据,其中每行对应一个维度,并且时间序列标识符设置为 。column_id
让我们看一个从时间序列中提取特征的简单示例:
import pandas as pd
from tsfresh import extract_features, select_features
from tsfresh.utilities.dataframe_functions import impute
# Создаём тестовый DataFrame с данными временного ряда
df = pd.DataFrame({
'id': [1]*100,
'time': range(100),
'value': [i + (i % 10)*5 for i in range(100)]
})
# Извлекаем признаки
extracted_features = extract_features(df, column_id="id", column_sort="time")
# Обрабатываем NaN значения, если они есть
impute(extracted_features)
print("Извлечённые признаки:")
print(extracted_features.head())
Извлечённые признаки:
value__abs_energy value__agg_autocorrelation__lag_1 value__mean ... value__number_peaks
id
1 1234.56 0.987 45.67 ... 3
在几行代码中,我们得到了一个 DataFrame,其中包含为我们的系列计算的数百个特征。
数据格式和特征提取设置
要使 TSFresh 正常工作,数据必须采用长格式。这是什么意思?每个时间序列必须具有唯一的标识符、时间戳和值。如果数据以宽格式显示(其中每列都是一个单独的特征),则必须将其转换为长格式。column_id
column_sort
column_value
长格式数据示例:
# Пример DataFrame в длинном формате
df_long = pd.DataFrame({
'id': [1, 1, 1, 2, 2, 2],
'time': [1, 2, 3, 1, 2, 3],
'value': [10, 15, 14, 20, 25, 24]
})
调用时,您可以设置以下参数:extract_features()
-
column_id
是时间序列的标识符。 -
column_sort
是执行排序所依据的时间戳。 -
column_kind
- 如果同一数据集中有多个值类型。 -
column_value
是具有值本身的列。
TSFresh 还具有一组内置参数,默认情况下,这些参数可计算 60 多个功能类别。但是,通常需要为特定任务配置一组特征。为此,您可以使用对象,也可以使用 .EfficientFCParameters
KindToFCParameters
设置参数示例:
from tsfresh.feature_extraction.settings import EfficientFCParameters
# Используем набор эффективных параметров для быстрого извлечения признаков
extraction_settings = EfficientFCParameters()
# Извлечение признаков с заданными настройками
features = extract_features(
df,
column_id="id",
column_sort="time",
default_fc_parameters=extraction_settings
)
impute(features)
print("Настроенные признаки:")
print(features.head())
如果你只想保留相关特征(例如,自相关和熵),你可以使用必要的参数创建自定义字典:
custom_fc_parameters = {
"value": {
"autocorrelation": [{"lag": 1}],
"approximate_entropy": [{"m": 2, "r": 0.3}],
# Добавляем другие необходимые функции...
}
}
features_custom = extract_features(
df,
column_id="id",
column_sort="time",
default_fc_parameters=custom_fc_parameters
)
impute(features_custom)
print("Кастомные признаки:")
print(features_custom.head())
Кастомные признаки:
value__autocorrelation value__approximate_entropy
id
1 0.950000 0.450000
与 scikit-learn 和管道集成
使用 TSFreshFeatureExtractor 和 TSFreshRelevantFeatureExtractor 类,您可以将自动特征提取嵌入到标准 ML 管道中。
示例:首先我们提取特征,然后将它们标准化并提交给分类器:
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from tsfresh.transformers import TSFreshFeatureExtractor
# Предположим, что у нас есть DataFrame с временными рядами в длинном формате
# и метками для классификации
df_ts = pd.DataFrame({
'id': [1]*50 + [2]*50,
'time': list(range(50))*2,
'value': [i + (i % 5) for i in range(50)] + [i - (i % 3) for i in range(50)]
})
y = [0, 1] # Метки классов для двух временных рядов
# Создаём пайплайн
pipeline = Pipeline([
('tsfresh', TSFreshFeatureExtractor(column_id="id", column_sort="time")),
('scaler', StandardScaler()),
('clf', RandomForestClassifier(n_estimators=100, random_state=42))
])
# Применяем fit_transform() для обучения
pipeline.fit(df_ts, y)
# Получаем предсказания
predictions = pipeline.predict(df_ts)
print("Предсказания классификатора:")
print(predictions)
Предсказания классификатора:
[0 1]
管道提取两个时间序列(id 1 和 2)的特征,对其进行扩展,并训练分类器。在此示例中,模型预测第一个序列的类为 0,第二个序列的类为 1。
要素筛选和自定义要素
特征过滤机制
假设您从时间序列中提取了 500 个特征。其中,10 个是有用的。其余的会让您产生噪音、过度拟合、减慢速度,并且通常会干扰生活。这就是为什么 TSFresh 立即让我们使用内置函数的原因 — 它只选择那些对目标变量具有统计意义的特征。select_features()
不同的测试工作:
-
f_classif
— 用于具有连续特征的分类。 -
chi2
— 用于分类。 -
mutual_info_classif
— 如果你想要一些非标准的东西。 -
Kendall
,用于排名关联。Spearman
但基本上,一切都是自动为您完成的。
from tsfresh import extract_features, select_features
from tsfresh.utilities.dataframe_functions import impute
import pandas as pd
import numpy as np
# Создаём игрушечный датасет с двумя временными рядами
df = pd.DataFrame({
'id': [1]*50 + [2]*50,
'time': list(range(50))*2,
'value': np.concatenate([
np.sin(np.linspace(0, 10, 50)) + np.random.normal(0, 0.1, 50),
np.random.normal(0, 1, 50)
])
})
# Целевая переменная (например, к какому классу относится временной ряд)
y = pd.Series([1, 0], index=[1, 2]) # 1 - синус, 0 - шум
# Извлекаем фичи
features = extract_features(df, column_id="id", column_sort="time")
impute(features)
# Фильтруем признаки по значимости
filtered_features = select_features(features, y)
print("Отобранные признаки:")
print(filtered_features.head())
select_features()
将仅返回那些实际上与 统计相关的特征。y
创建自定义特征
有时 TSFresh 不知道你的域。你知道。然后你可以(并且应该)编写自己的功能。TSFresh 通过两个装饰器支持自定义函数:
-
@set_property(...)
— 指出了有关该功能的元信息。 -
@aggregate_feature(...)
— 注册功能本身。
假设你想要一个计算尖峰数量的功能 — 值超过邻居并且是标准差两倍的地方。这已经更高了。现在 — 如何将其构建成一个成熟的任务:extract_features()
# 1. Определяем кастомную фичу
from tsfresh.feature_extraction.feature_calculators import set_property, aggregate_feature
@set_property("fctype", "simple")
@aggregate_feature("value")
def count_peaks_above_2std(x):
"""Количество пиков, превышающих 2 стандартных отклонения."""
if len(x) x[i-1] and x[i] > x[i+1] and x[i] > threshold]
return len(peaks)
# 2. Добавляем её в словарь кастомных фичей
from tsfresh.feature_extraction import extract_features
from tsfresh.utilities.dataframe_functions import impute
# Кастомный конфиг, где добавлена наша функция
from tsfresh.feature_extraction.settings import ComprehensiveFCParameters
custom_fc_parameters = {
"value": {
"count_peaks_above_2std": None # функция без параметров
}
}
# 3. Применяем на датафрейме
features = extract_features(
df,
column_id="id",
column_sort="time",
default_fc_parameters=custom_fc_parameters
)
impute(features)
print("Кастомная фича на данных:")
print(features.head())
您可以使用自己的自定义功能和任何内置功能。自定义函数可以具有参数、类型、验证器(例如,仅适用于浮点数,或者仅当行长于 10 时)。
使用大数据、滚动窗口和并发
在处理大量数据(例如 100k 用户的时间序列)时,正确设置并发和内存管理非常重要。TSFresh 允许您使用三个参数来执行此作:
-
n_jobs
允许您在多个处理器内核之间分配计算。例如,如果您有 8 个内核,则安装可以显著减少处理时间。但是,请记住,过多的内核会导致过载,尤其是在并发进程争夺内存时。n_jobs=8
-
disable_progressbar
额外的进度条输出可能会减慢速度。将此参数设置为 True 会删除视觉指示器,从而释放用于计算的资源。 -
chunksize
有助于优化 RAM 使用。如果数据非常大,则可以合理地指定(例如,或更少)TSFresh 以块的形式处理数据块,而无需一次将整个样本加载到内存中。chunksize=1000
示例设置:
# Извлечение признаков с оптимизированными параметрами для больших данных:
features = extract_features(
df_large, # Ваш большой DataFrame
column_id="id",
column_sort="time",
n_jobs=4, # Распараллеливание на 4 ядрах
disable_progressbar=True, # Выключение прогресс-бара
chunksize=1000 # Обработка порциями по 1000 строк
)
impute(features) # Не забываем обработать NaN значения
print("Оптимизированные признаки:")
print(features.head())
当预测或分析任务取决于时间序列的局部特征时,滚动窗口也是一个很好的工具。TSFresh 具有允许您创建新的数据滚动子样本的功能。例如,如果每个用户有 200 个时间点的数据,则可以应用最大偏移量为 50 个时间单位的窗口来获取每个间隔的聚合特征。roll_time_series()
让我们看一个例子:
import pandas as pd
import numpy as np
from tsfresh.utilities.dataframe_functions import roll_time_series
from tsfresh import extract_features
from tsfresh.utilities.dataframe_functions import impute
# Допустим, у нас есть большой DataFrame с временными рядами
df_large = pd.DataFrame({
'id': [i for i in range(1, 101) for _ in range(200)],
'time': list(range(200)) * 100,
'value': np.random.randn(200 * 100)
})
# Применяем rolling окна с максимальным сдвигом в 50 единиц времени.
rolled_df = roll_time_series(
df_large,
column_id="id",
column_sort="time",
max_timeshift=50
)
# Теперь извлекаем признаки для каждого окна с использованием оптимизированных настроек
features_rolled = extract_features(
rolled_df,
column_id="id",
column_sort="time",
n_jobs=4, # Распараллеливание вычислений
disable_progressbar=True,
chunksize=1000 # Обработка порциями для экономии RAM
)
impute(features_rolled)
print("Признаки после применения rolling-окон:")
print(features_rolled.head())
该函数为每个时间序列生成滑动窗口,从而允许您分析数据中的局部变化。roll_time_series()
有时,您希望将特征提取、扩展和模型训练结合在一个链式流程中。将该类与标准 scikit-learn 组件结合使用,您可以构建用于预测任务的管道。从好的方面来说,您可以使用滚动窗口来创建滞后特征。TSFreshFeatureExtractor
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from tsfresh.transformers import TSFreshFeatureExtractor
from tsfresh.utilities.dataframe_functions import roll_time_series
# Генерируем данные для forecasting: 100 временных рядов, 150 точек каждый
df_forecast = pd.DataFrame({
'id': [i for i in range(1, 101) for _ in range(150)],
'time': list(range(150)) * 100,
'value': np.random.randn(150 * 100)
})
# Применяем roll_time_series для создания лаговых признаков (окно с максимальным сдвигом в 20)
rolled_forecast = roll_time_series(
df_forecast,
column_id="id",
column_sort="time",
max_timeshift=20
)
# Создаем пайплайн для прогнозирования
forecast_pipeline = Pipeline([
('tsfresh', TSFreshFeatureExtractor(
column_id="id",
column_sort="time",
n_jobs=4, # Распараллеливание вычислений
disable_progressbar=True,
chunksize=500 # Обработка порциями для больших данных
)),
('scaler', StandardScaler()),
('regressor', LinearRegression())
])
# Предположим, что таргет – одно значение на временной ряд
y_forecast = np.random.randn(rolled_forecast['id'].nunique())
# Обучаем пайплайн и получаем прогнозы
forecast_pipeline.fit(rolled_forecast, y_forecast)
preds = forecast_pipeline.predict(rolled_forecast)
print("Прогнозы модели (первые 10 значений):")
print(preds[:10])
生成 100 个时间序列的数据。应用滚动窗口,创建最大偏移为 20 的滞后特征。收集一个管道,该管道首先通过 TSFresh 提取特征,然后缩放它们并训练线性回归。使用参数优化大数据的工作。n_jobs
disable_progressbar
chunksize
所有最新的 DS 和 ML 方法和工具都可以在 OTUS 在线课程中掌握:在目录中,您可以查看所有程序的列表,在 日历中,您可以注册公开课。