|
import numpy as np
import pandas as pd
from arch import arch_model
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import LSTM, Dense
# 读取数据
data = pd.read_csv('exchange_rate.csv') # 替换为自己的数据文件路径
exchange_rate = data['exchange_rate'].values
# 进行一阶差分
diff_exchange_rate = np.diff(exchange_rate)
# 构建GARCH模型
model = arch_model(diff_exchange_rate, vol='Garch', p=1, q=1)
model_fit = model.fit()
# 预测波动率
forecast_volatility = model_fit.forecast(horizon=30)
# 准备LSTM模型的输入数据
X = np.concatenate((forecast_volatility.variance[-1, :], data['macro_indicator'].values[:-30]), axis=1)
y = exchange_rate[30:]
# 归一化输入数据
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)
# 划分训练集和测试集
train_size = int(len(X_scaled) * 0.8)
X_train, X_test = X_scaled[:train_size], X_scaled[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
# 调整输入数据形状
X_train = np.reshape(X_train, (X_train.shape[0], 1, X_train.shape[1]))
X_test = np.reshape(X_test, (X_test.shape[0], 1, X_test.shape[1]))
# 构建LSTM模型
model = Sequential()
model.add(LSTM(50, input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
# 训练LSTM模型
model.fit(X_train, y_train, epochs=100, batch_size=32, verbose=0)
# 在测试集上进行预测
y_pred = model.predict(X_test)
# 还原预测结果的尺度
y_pred = scaler.inverse_transform(y_pred)
# 打印预测结果
print(y_pred)
|