原文地址:https://www.joinquant.com/view/community/detail/2e996d21c95a409ba23a651085ab9f61
1 数据源:黄金主力数据 来源于JQData
2 数据清洗
3 使用黄金主力数据 进⾏预测的2个实验
数据集:70%用做训练集 训练模型 ;30%测试集。
模型:Keras框架, 用LSTM模型对收盘价进行预测
循环神经⽹网络,RNN(Recurrent Neural Network)中的LSTM(Long Short-Term Memory)
实验结果:是测试集的结果。test为测试集的真实收盘价,pred为模型预测的收盘价
实验1:
使用历史前5个时刻的收盘价
预测当前时刻的收盘价
每组输入包括5个step,每个step对应⼀一收盘价,输出⼀一维,即 [None, 5, 1] => [None, 1]
from keras.layers import Input, Dense, LSTM
from keras.models import Model
output_dim = 1
batch_size = 256
epochs = 10
seq_len = 5
hidden_size = 128
X_train = np.array([data_train[i : i + seq_len, 0] for i in range(data_train.shape[0] - seq_len)])[:, :, np.newaxis]
y_train = np.array([data_train[i + seq_len, 0] for i in range(data_train.shape[0]- seq_len)])
X_test = np.array([data_test[i : i + seq_len, 0] for i in range(data_test.shape[0]- seq_len)])[:, :, np.newaxis]
y_test = np.array([data_test[i + seq_len, 0] for i in range(data_test.shape[0] - seq_len)])
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
X = Input(shape=[X_train.shape[1], X_train.shape[2],])
h = LSTM(hidden_size, activation='relu')(X)
Y = Dense(output_dim, activation='sigmoid')(h)
model = Model(X, Y)
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, shuffle=False)
y_pred = model.predict(X_test)
print('MSE Train:', model.evaluate(X_train, y_train, batch_size=batch_size))
print('MSE Test:', model.evaluate(X_test, y_test, batch_size=batch_size))
plt.plot(y_test, label='test')
plt.plot(y_pred, label='pred')
plt.legend()
plt.show()
实验2:
使⽤历史前5个时刻的 open close high low volume money
预测当前时刻的收盘价,
即 [None, 5, 6] => [None, 1]
from keras.layers import Input, Dense, LSTM
from keras.models import Model
output_dim = 1
batch_size = 256
epochs = 10
seq_len = 5
hidden_size = 128
X_train = np.array([data_train[i : i + seq_len, :] for i in range(data_train.shape[0] - seq_len)])
y_train = np.array([data_train[i + seq_len, 0] for i in range(data_train.shape[0]- seq_len)])
X_test = np.array([data_test[i : i + seq_len, :] for i in range(data_test.shape[0]- seq_len)])
y_test = np.array([data_test[i + seq_len, 0] for i in range(data_test.shape[0] - seq_len)])
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
X = Input(shape=[X_train.shape[1], X_train.shape[2],])
h = LSTM(hidden_size, activation='relu')(X)
Y = Dense(output_dim, activation='sigmoid')(h)
model = Model(X, Y)
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, shuffle=False)
y_pred = model.predict(X_test)
print('MSE Train:', model.evaluate(X_train, y_train, batch_size=batch_size))
print('MSE Test:', model.evaluate(X_test, y_test, batch_size=batch_size))
plt.plot(y_test, label='test')
plt.plot(y_pred, label='pred')
plt.legend()
plt.show()