
Python 时间序列预测
Prophet 用于业务预测
from prophet import Prophet
import pandas as pd
import numpy as np
# Prophet 期望 'ds'(日期时间)和 'y'(值)列
df = pd.read_csv("sales.csv")
df = df.rename(columns={"date": "ds", "sales": "y"})
df["ds"] = pd.to_datetime(df["ds"])
model = Prophet(
changepoint_prior_scale=0.05, # 趋势灵活性
seasonality_prior_scale=10, # 季节性强度
holidays_prior_scale=10,
seasonality_mode="multiplicative",
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
)
# 添加自定义季节性
model.add_seasonality(name="monthly", period=30.5, fourier_order=5)
# 添加节假日
from prophet.make_holidays import make_holidays_df
holidays = make_holidays_df(year_list=[2024, 2025, 2026], country="US")
model = Prophet(holidays=holidays)
model.fit(df)
# 预测 90 天
future = model.make_future_dataframe(periods=90)
forecast = model.predict(future)
# 评估
from prophet.diagnostics import cross_validation, performance_metrics
cv_results = cross_validation(model, initial="365 days", period="30 days", horizon="90 days")
metrics = performance_metrics(cv_results)
print(f"MAPE: {metrics['mape'].mean():.4f}")
print(f"MAE: {metrics['mae'].mean():.2f}")

ARIMA/SARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import adfuller
import warnings
warnings.filterwarnings("ignore")
def check_stationarity(series: pd.Series) -> dict:
result = adfuller(series.dropna())
return {
"p_value": result[1],
"is_stationary": result[1] < 0.05,
"adf_statistic": result[0],
}
def find_best_sarima(series: pd.Series, seasonal_period: int = 12) -> tuple:
import itertools
from sklearn.metrics import mean_absolute_percentage_error
p_range = range(0, 3)
d_range = range(0, 2)
q_range = range(0, 3)
best_aic, best_order = float("inf"), None
for p, d, q in itertools.product(p_range, d_range, q_range):
try:
model = SARIMAX(series, order=(p, d, q),
seasonal_order=(1, 1, 1, seasonal_period))
fit = model.fit(disp=False)
if fit.aic < best_aic:
best_aic = fit.aic
best_order = (p, d, q)
except Exception:
continue
return best_order, best_aic
# 拟合最佳模型
order, aic = find_best_sarima(df["y"])
model = SARIMAX(df["y"], order=order, seasonal_order=(1, 1, 1, 12))
fitted = model.fit(disp=False)
# 预测
forecast = fitted.forecast(steps=90)
conf_int = fitted.get_forecast(steps=90).conf_int()

LSTM 用于复杂模式
import torch
import torch.nn as nn
import numpy as np
from sklearn.preprocessing import MinMaxScaler
class TimeSeriesLSTM(nn.Module):
def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1):
super().__init__()
self.lstm = nn.LSTM(
input_size, hidden_size, num_layers,
batch_first=True, dropout=0.2
)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
out, _ = self.lstm(x)
return self.fc(out[:, -1, :])
def prepare_sequences(data: np.ndarray, seq_len: int = 30):
X, y = [], []
for i in range(len(data) - seq_len):
X.append(data[i:i+seq_len])
y.append(data[i+seq_len])
return np.array(X), np.array(y)
scaler = MinMaxScaler()
scaled = scaler.fit_transform(df[["y"]].values)
SEQ_LEN = 30
X, y = prepare_sequences(scaled, SEQ_LEN)
X_tensor = torch.FloatTensor(X)
y_tensor = torch.FloatTensor(y)
model = TimeSeriesLSTM()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.HuberLoss()
for epoch in range(100):
model.train()
optimizer.zero_grad()
pred = model(X_tensor)
loss = criterion(pred, y_tensor)
loss.backward()
optimizer.step()
if epoch % 10 == 0:
print(f"Epoch {epoch}: {loss.item():.6f}")

Temporal Fusion Transformer
from pytorch_forecasting import TemporalFusionTransformer, TimeSeriesDataSet
from pytorch_forecasting.metrics import SMAPE, QuantileLoss
# 准备数据
max_encoder_length = 60
max_prediction_length = 20
training = TimeSeriesDataSet(
df_training,
time_idx="time_idx",
target="sales",
group_ids=["store", "product"],
max_encoder_length=max_encoder_length,
max_prediction_length=max_prediction_length,
static_categoricals=["store"],
static_reals=["avg_price"],
time_varying_known_reals=["time_idx", "price", "is_holiday"],
time_varying_unknown_reals=["sales"],
target_normalizer=GroupNormalizer(groups=["store", "product"], transformation="softplus"),
)
tft = TemporalFusionTransformer.from_dataset(
training,
learning_rate=0.03,
hidden_size=32,
attention_head_size=2,
dropout=0.1,
hidden_continuous_size=16,
loss=QuantileLoss(),
)
预测评估
from sklearn.metrics import mean_absolute_error, mean_squared_error
def evaluate_forecast(actual: np.ndarray, predicted: np.ndarray) -> dict:
mae = mean_absolute_error(actual, predicted)
rmse = np.sqrt(mean_squared_error(actual, predicted))
mape = np.mean(np.abs((actual - predicted) / np.where(actual == 0, 1, actual))) * 100
smape = 100 * np.mean(2 * np.abs(predicted - actual) / (np.abs(actual) + np.abs(predicted) + 1e-8))
return {"MAE": mae, "RMSE": rmse, "MAPE": mape, "sMAPE": smape}
# 比较模型
models = {"Prophet": prophet_preds, "SARIMA": sarima_preds, "LSTM": lstm_preds}
for name, preds in models.items():
metrics = evaluate_forecast(y_test, preds)
print(f"{name}: MAPE={metrics['MAPE']:.2f}%, RMSE={metrics['RMSE']:.2f}")
模型比较
| 模型 |
优势 |
劣势 |
| ARIMA |
可解释性强 |
需要平稳性 |
| Prophet |
处理节假日/趋势 |
复杂模式精度较低 |
| LSTM |
捕捉非线性模式 |
需要大量数据 |
| TFT |
最佳精度 |
配置复杂 |