株価予測
設定¶
In [1]:
import collections
import logging
import os
import pathlib
import re
import string
import sys
import time
from datetime import datetime, timedelta
import yfinance as yf
from ta import volatility as vol
from ta import trend
import ta
import math
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import tensorflow as tf
from tensorflow.python.client import device_lib
os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"
logging.getLogger("tensorflow").setLevel(logging.ERROR) # suppress warnings
display(device_lib.list_local_devices())
display(tf.config.list_physical_devices("GPU"))
関数定義¶
In [2]:
def fetch_stock_data(ticker, start_date, end_date):
return yf.download(ticker, start=start_date, end=end_date)
def set_date_axis_formatting(ax: matplotlib.axes.Axes):
ax.xaxis.set_major_locator(mdates.AutoDateLocator())
ax.xaxis.set_major_formatter(mdates.DateFormatter("%y-%m-%d"))
plt.setp(ax.get_xticklabels(), rotation=45, ha="right")
def create_sequences(data, seq_length):
"""
時系列データの変換
"""
X = []
y = []
for i in range(len(data) - seq_length):
# 指定された日数分の各指数
X.append(data[i : (i + seq_length), 1:])
# 翌日の終値
y.append(data[i + seq_length, 0])
return np.array(X), np.array(y)[..., np.newaxis]
def concatenate_arrays_or_return_second(array_first, array_second):
if array_first is None:
concatenated_array = array_second
else:
concatenated_array = np.concatenate((array_first, array_second), 0)
return concatenated_array
def make_batches(X, y, buffer_size, batch_size):
return (
tf.data.Dataset.from_tensor_slices((X, y))
.shuffle(buffer_size)
.batch(batch_size)
.prefetch(tf.data.AUTOTUNE)
)
データセットをダウンロードする¶
In [3]:
# 予測したい株のティッカーシンボル
tickers = [
"AAPL",
"GOOGL",
"MSFT",
"AMZN",
"TSLA",
"JNJ",
"V",
"PG",
"NVDA",
"IBM",
"GE",
"F",
"XOM",
"NOK",
"BB",
"M",
]
# tickers = tickers[:1]
start_date = "1950-01-01"
# start_date = '2022-01-01'
end_date = "2023-09-01"
# 描画する縦・横の枚数
grid_size = math.ceil(math.sqrt(len(tickers)))
# grid_size
In [4]:
# 全ての銘柄のデータを格納する辞書
data_all = {}
fig = plt.figure(figsize=(12, 10))
for i, ticker in enumerate(tickers):
# データ取得
data = fetch_stock_data(ticker, start_date, end_date)
# 欠損値の削除
data.dropna(inplace=True)
# 各銘柄のデータフレームの辞書を作成
data_all[ticker] = {"data": data}
print(data_all[ticker]["data"].shape)
ax = fig.add_subplot(grid_size, grid_size, i + 1)
ax.plot(data["Close"])
set_date_axis_formatting(ax)
ax.set_title(ticker)
ax.grid()
# グラフ間の間隔を調整
plt.subplots_adjust(hspace=1.0, wspace=0.3)
In [5]:
# Find the oldest start date and the latest end date
start_date = min(v["data"].index.min() for v in data_all.values())
end_date = max(v["data"].index.max() for v in data_all.values())
# Perform a binary search within this date range
while end_date - start_date > timedelta(days=1): # stop if the range is less than a day
mid_date = start_date + (end_date - start_date) / 2
train_size = sum((v["data"].index <= mid_date).sum() for v in data_all.values())
test_size = sum((v["data"].index > mid_date).sum() for v in data_all.values())
ratio = train_size / (train_size + test_size)
# print(mid_date, train_size, test_size, ratio)
if ratio > 0.9:
end_date = mid_date
else:
start_date = mid_date
split_date = mid_date
print(split_date, train_size, test_size, ratio)
テクニカル指標の計算¶
In [6]:
fig = plt.figure(figsize=(12, 10))
sma_windows = [5, 10, 15, 60]
bol_window = 20
rsi_window = 14
for i, ticker in enumerate(tickers):
data = data_all[ticker]["data"]
close = data["Close"]
if data.columns.shape[0] <= 6:
# テクニカル指標の計算
for sma_window in sma_windows:
data[f"SMA{sma_window}"] = trend.SMAIndicator(
close, window=sma_window
).sma_indicator()
data["BBL"] = vol.bollinger_lband(close, window=bol_window, window_dev=2)
data["BBH"] = vol.bollinger_hband(close, window=bol_window, window_dev=2)
data["BBM"] = vol.bollinger_mavg(close, window=bol_window)
data["RSI"] = ta.momentum.RSIIndicator(close, window=rsi_window).rsi()
macd = trend.MACD(close)
data["MACD"] = macd.macd()
data["MACD_signal"] = macd.macd_signal()
data["MACD_diff"] = macd.macd_diff()
# 欠損値の削除
data.dropna(inplace=True)
# 各銘柄のデータフレームの辞書を更新
data_all[ticker]["data"] = data
# print(data_all[ticker]["data"].shape)
ax = fig.add_subplot(grid_size, grid_size, i + 1)
ax.plot(data["Close"])
set_date_axis_formatting(ax)
ax.set_title(ticker)
ax.grid()
print(data_all[ticker]["data"].columns)
# グラフ間の間隔を調整
plt.subplots_adjust(hspace=1.0, wspace=0.3)
ハイパーパラメータ¶
In [7]:
target_data = ["Close"]
input_features = ["SMA5", "SMA10", "SMA60", "RSI", "BBL", "BBH", "MACD", "Volume"]
# input_features = [
# "SMA10", "RSI", "MACD", "Volume"
# ]
# ターゲットは必ず先頭に置くこと!!
learn_columns = target_data + input_features
num_features = len(input_features)
seq_length = 30
buffer_size = 10000
batch_size = 1024
num_layers = 8
d_model = num_features
dff = 32
num_heads = 2
dropout_rate = 0.1
epochs = 10
特徴量選択、データのスケーリング、データ分割¶
In [8]:
# スケーリングは訓練データとテストデータ別々で行う
# 試した限り、データは0から1の範囲内にした方が性能が良い
# Splitting data and scaling
for ticker in tickers:
data = data_all[ticker]["data"]
# Split the data into training and testing based on the split_date
train_data = data[data.index <= split_date]
test_data = data[data.index > split_date]
# Scaling
train_scaler = MinMaxScaler()
test_scaler = MinMaxScaler()
# Scale the training data and transform the testing data using the scaler fitted on training data
scaled_train_data = train_scaler.fit_transform(train_data[learn_columns]).astype(
np.float32
)
# scaled_test_data = train_scaler.transform(test_data[learn_columns]).astype(
# np.float32
# )
scaled_test_data = test_scaler.fit_transform(test_data[learn_columns]).astype(
np.float32
)
# Convert back to DataFrame for convenience
scaled_train_data = pd.DataFrame(
scaled_train_data, columns=learn_columns, index=train_data.index
)
scaled_test_data = pd.DataFrame(
scaled_test_data, columns=learn_columns, index=test_data.index
)
# Store the scaled data and the scaler
data_all[ticker]["train_scaler"] = train_scaler
data_all[ticker]["test_scaler"] = test_scaler
data_all[ticker]["scaled_train_data"] = scaled_train_data
data_all[ticker]["scaled_test_data"] = scaled_test_data
# print(
# ticker,
# data_all[ticker]["scaled_train_data"].shape,
# data_all[ticker]["scaled_test_data"].shape,
# )
In [9]:
# 辞書
X_train_all = {}
y_train_all = {}
X_test_all = {}
y_test_all = {}
for ticker in tickers:
scaled_train_data = data_all[ticker]["scaled_train_data"]
scaled_test_data = data_all[ticker]["scaled_test_data"]
# 入力と出力を分ける
X_train_all[ticker], y_train_all[ticker] = create_sequences(
scaled_train_data.values, seq_length
)
X_test_all[ticker], y_test_all[ticker] = create_sequences(
scaled_test_data.values, seq_length
)
# print(
# X_train_all[ticker].shape,
# y_train_all[ticker].shape,
# X_test_all[ticker].shape,
# y_test_all[ticker].shape,
# )
銘柄ごとのデータを統一¶
In [10]:
X_train_cat, y_train_cat, X_test_cat, y_test_cat = None, None, None, None
for i, ticker in enumerate(tickers):
X_train_cat = concatenate_arrays_or_return_second(X_train_cat, X_train_all[ticker])
y_train_cat = concatenate_arrays_or_return_second(y_train_cat, y_train_all[ticker])
X_test_cat = concatenate_arrays_or_return_second(X_test_cat, X_test_all[ticker])
y_test_cat = concatenate_arrays_or_return_second(y_test_cat, y_test_all[ticker])
# y_train_cat = y_train_cat[..., np.newaxis, np.newaxis]
# y_test_cat = y_test_cat[..., np.newaxis, np.newaxis]
print(X_train_cat.shape, y_train_cat.shape, X_test_cat.shape, y_test_cat.shape)
バッチ作成¶
In [11]:
train_batches = make_batches(X_train_cat, y_train_cat, buffer_size, batch_size)
test_batches = make_batches(X_test_cat, y_test_cat, buffer_size, batch_size)
# for i, j in enumerate(test_batches):
# if i < 2:
# x, y = j
# print(x)
# print(y)
# print(i)
位置エンコーディング¶
$$ PE_{(pos, 2i)} = sin(pos / 10000^{2i / d_{\text{model}}}) $$$$ PE_{(pos, 2i+1)} = cos(pos / 10000^{2i / d_{\text{model}}}) $$In [12]:
def get_angles(pos, i, d_model):
angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
return pos * angle_rates
def positional_encoding(position, d_model):
angle_rads = get_angles(
np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model
)
# apply sin to even indices in the array; 2i
angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
# apply cos to odd indices in the array; 2i+1
angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
# print(angle_rads.shape)
pos_encoding = angle_rads[np.newaxis, ...]
# print(pos_encoding.shape)
return tf.cast(pos_encoding, dtype=tf.float32)
位置エンコーディングの可視化¶
In [13]:
n, d = seq_length, num_features
# n, d = 2048, 512
pos_encoding = positional_encoding(n, d)
print(pos_encoding.shape)
pos_encoding = pos_encoding[0]
# Juggle the dimensions for the plot
pos_encoding = tf.reshape(pos_encoding, (n, d // 2, 2))
pos_encoding = tf.transpose(pos_encoding, (2, 1, 0))
pos_encoding = tf.reshape(pos_encoding, (d, n))
plt.pcolormesh(pos_encoding, cmap="RdBu")
plt.ylabel("Depth")
plt.xlabel("Position")
plt.colorbar()
plt.show()
Scaled Dot-Product Attention¶
In [14]:
def scaled_dot_product_attention(q, k, v, mask):
"""
Calculate the attention weights.
q, k, v must have matching leading dimensions.
k, v must have matching penultimate dimension, i.e.: seq_len_k = seq_len_v.
The mask has different shapes depending on its type(padding or look ahead)
but it must be broadcastable for addition.
Args:
q: query shape == (..., seq_len_q, depth)
k: key shape == (..., seq_len_k, depth)
v: value_shape == (..., seq_len_v, depth_v)
mask: Float tensor with shape broadcastable
to (..., seq_len_q, seq_len_k). Defaults to None.
Returns:
output, attention_weights
"""
# (..., seq_len_q, seq_len_k)
matmul_qk = tf.matmul(q, k, transpose_b=True)
# scale matmul_qk
dk = tf.cast(tf.shape(k)[-1], tf.float32)
scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
# デバッグ
# print(scaled_attention_logits.shape)
# if mask is None:
# print(mask)
# else:
# print(mask.shape)
# add the mask to the scaled tensor.
if mask is not None:
# print(scaled_attention_logits)
# print(mask)
scaled_attention_logits += mask * -1e9
# print(scaled_attention_logits)
# soft_max is normalized on the last axis (seq_len_k) so that the scores
# add up to 1.
attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
output = tf.matmul(attention_weights, v) # (..., seq_len_q, depth_v)
return output, attention_weights
def print_out(q, k, v):
temp_out, temp_attn = scaled_dot_product_attention(q, k, v, None)
print("Attention weights are:")
print(temp_attn)
print("Output is:")
print(temp_out)
np.set_printoptions(suppress=True)
temp_k = tf.constant([[10, 0, 0], [0, 10, 0], [0, 0, 10], [0, 0, 10]], dtype=tf.float32)
temp_v = tf.constant([[1, 0], [10, 0], [100, 5], [1000, 6]], dtype=tf.float32)
temp_q = tf.constant([[0, 0, 10], [0, 10, 0], [10, 10, 0]], dtype=tf.float32)
print_out(temp_q, temp_k, temp_v)
マルチヘッドアテンション¶
In [15]:
class MultiHeadAttention(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.d_model = d_model
assert d_model % self.num_heads == 0
self.depth = d_model // self.num_heads
self.wq = tf.keras.layers.Dense(d_model)
self.wk = tf.keras.layers.Dense(d_model)
self.wv = tf.keras.layers.Dense(d_model)
self.dense = tf.keras.layers.Dense(d_model)
def split_heads(self, x, batch_size):
"""
Split the last dimension into (num_heads, depth).
Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
"""
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, v, k, q, mask):
batch_size = tf.shape(q)[0]
q = self.wq(q) # (batch_size, seq_len, d_model)
k = self.wk(k) # (batch_size, seq_len, d_model)
v = self.wv(v) # (batch_size, seq_len, d_model)
# (batch_size, num_heads, seq_len, depth)
q = self.split_heads(q, batch_size)
# (batch_size, num_heads, seq_len, depth)
k = self.split_heads(k, batch_size)
# (batch_size, num_heads, seq_len, depth)
v = self.split_heads(v, batch_size)
# scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
# attention_weights.shape = (batch_size, num_heads, seq_len_q, seq_len_k)
scaled_attention, attention_weights = scaled_dot_product_attention(
q, k, v, mask
)
# (batch_size, seq_len_q, num_heads, depth)
scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
concat_attention = tf.reshape(
scaled_attention, (batch_size, -1, self.d_model)
) # (batch_size, seq_len_q, d_model)
# (batch_size, seq_len_q, d_model)
output = self.dense(concat_attention)
return output, attention_weights
temp_mha = MultiHeadAttention(d_model=num_features, num_heads=4)
# (batch_size, encoder_sequence, d_model)
y = tf.random.uniform((1, seq_length, num_features))
out, attn = temp_mha(y, k=y, q=y, mask=None)
out.shape, attn.shape
Out[15]:
ポイントワイズフィードフォワードネットワーク¶
In [16]:
def point_wise_feed_forward_network(d_model, dff):
return tf.keras.Sequential(
[
# (batch_size, seq_len, dff)
tf.keras.layers.Dense(dff, activation="relu"),
tf.keras.layers.Dense(d_model), # (batch_size, seq_len, d_model)
]
)
sample_ffn = point_wise_feed_forward_network(num_features, seq_length)
sample_ffn(tf.random.uniform((batch_size, seq_length, num_features))).shape
Out[16]:
エンコーダーレイヤー¶
In [17]:
class EncoderLayer(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads, dff, rate=0.1):
super(EncoderLayer, self).__init__()
self.mha = MultiHeadAttention(d_model, num_heads)
self.ffn = point_wise_feed_forward_network(d_model, dff)
self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = tf.keras.layers.Dropout(rate)
self.dropout2 = tf.keras.layers.Dropout(rate)
def call(self, x, training, mask):
# (batch_size, input_seq_len, d_model)
attn_output, _ = self.mha(x, x, x, mask)
attn_output = self.dropout1(attn_output, training=training)
# (batch_size, input_seq_len, d_model)
out1 = self.layernorm1(x + attn_output)
ffn_output = self.ffn(out1) # (batch_size, input_seq_len, d_model)
ffn_output = self.dropout2(ffn_output, training=training)
# (batch_size, input_seq_len, d_model)
out2 = self.layernorm2(out1 + ffn_output)
return out2
sample_encoder_layer = EncoderLayer(num_features, 4, 32)
sample_encoder_layer_output = sample_encoder_layer(
tf.random.uniform((batch_size, seq_length, num_features)), False, None
)
sample_encoder_layer_output.shape # (batch_size, input_seq_len, d_model)
Out[17]:
エンコーダー¶
In [18]:
class Encoder(tf.keras.layers.Layer):
def __init__(
self, num_layers, d_model, num_heads, dff, maximum_position_encoding, rate=0.1
):
super(Encoder, self).__init__()
self.d_model = d_model
self.num_layers = num_layers
self.pos_encoding = positional_encoding(maximum_position_encoding, self.d_model)
self.enc_layers = [
EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)
]
self.dropout = tf.keras.layers.Dropout(rate)
def call(self, x, training, mask):
# x.shape == (batch_size, input_seq_len, d_model)
seq_len = tf.shape(x)[1]
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x += self.pos_encoding[:, :seq_len, :]
x = self.dropout(x, training=training)
for i in range(self.num_layers):
x = self.enc_layers[i](x, training, mask)
return x # (batch_size, input_seq_len, d_model)
sample_encoder = Encoder(
num_layers=2,
d_model=num_features,
num_heads=4,
dff=32,
maximum_position_encoding=1000,
)
temp_input = tf.random.uniform(
(batch_size, seq_length, num_features), dtype=tf.float32, minval=-1, maxval=1
)
sample_encoder_output = sample_encoder(temp_input, training=False, mask=None)
sample_encoder_output.shape # (batch_size, input_seq_len, d_model)
Out[18]:
トランスフォーマーを作成する¶
In [19]:
class Transformer(tf.keras.Model):
"""
エンコーダのみ
"""
def __init__(
self, num_layers, d_model, num_heads, dff, maximum_input_seq_len, rate=0.1
):
super(Transformer, self).__init__()
self.encoder = Encoder(
num_layers, d_model, num_heads, dff, maximum_input_seq_len, rate
)
self.final_layer = tf.keras.layers.Dense(1)
def call(self, inp, training):
# (batch_size, inp_seq_len, d_model)
enc_output = self.encoder(inp, training, None)
# 平均化プーリング
# (batch_size, d_model)
pooled_output = tf.reduce_mean(enc_output, axis=1)
# (batch_size, 1)
final_output = self.final_layer(pooled_output)
return final_output
sample_transformer = Transformer(
num_layers=2, d_model=num_features, num_heads=4, dff=32, maximum_input_seq_len=1000
)
temp_input = tf.random.uniform(
(batch_size, seq_length, num_features), dtype=tf.float32, minval=-1, maxval=1
)
print(temp_input.shape)
fn_out = sample_transformer(temp_input, training=False)
fn_out.shape # (batch_size, tar_seq_len, 1)
Out[19]:
オプティマイザ¶
$$ lrate = d^{-0.5}_{model}*min(step\_ num^{-0.5},step\_ num \cdot warmup\_ steps^{-1.5}) $$In [20]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, d_model, warmup_steps=4000):
super(CustomSchedule, self).__init__()
self.d_model = tf.cast(d_model, tf.float32)
self.warmup_steps = warmup_steps
def __call__(self, step):
step = tf.cast(step, dtype=tf.float32) # Add this line
arg1 = tf.math.rsqrt(step)
arg2 = step * (self.warmup_steps**-1.5)
return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
In [21]:
learning_rate = CustomSchedule(d_model)
optimizer = tf.keras.optimizers.Adam(
learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9
)
temp_learning_rate_schedule = CustomSchedule(d_model)
plt.plot(temp_learning_rate_schedule(tf.range(40000, dtype=tf.float32)))
plt.grid()
plt.ylabel("learning_rate")
plt.xlabel("Train Step")
Out[21]:
In [22]:
loss_object = tf.keras.losses.MeanSquaredError(reduction="none")
def loss_function(real, pred):
loss_ = loss_object(real, pred)
return tf.reduce_sum(loss_)
In [23]:
class CustomMeanMetric(tf.keras.metrics.Metric):
def __init__(self, name="custom_mean_metric", **kwargs):
super(CustomMeanMetric, self).__init__(name=name, **kwargs)
self.total = self.add_weight(name="total", initializer="zeros")
self.count = self.add_weight(name="count", initializer="zeros")
def update_state(self, values, sample_weight=None):
self.total.assign_add(tf.reduce_sum(values))
self.count.assign_add(tf.cast(tf.size(values), "float32"))
def result(self):
return self.total / self.count
def reset_states(self):
self.total.assign(0.0)
self.count.assign(0.0)
# Replace train_loss = tf.keras.metrics.Mean(name='train_loss')
# with the new CustomMeanMetric
train_loss = CustomMeanMetric(name="train_loss")
val_loss = CustomMeanMetric(name="val_loss")
モデル構築¶
In [24]:
transformer = Transformer(
num_layers=num_layers,
d_model=d_model,
num_heads=num_heads,
dff=dff,
maximum_input_seq_len=1000,
rate=dropout_rate,
)
In [25]:
checkpoint_path = "./checkpoints/multi_stock_transformer"
ckpt = tf.train.Checkpoint(transformer=transformer, optimizer=optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)
# if a checkpoint exists, restore the latest checkpoint.
if ckpt_manager.latest_checkpoint:
ckpt.restore(ckpt_manager.latest_checkpoint)
print("Latest checkpoint restored!!")
In [26]:
# The @tf.function trace-compiles train_step into a TF graph for faster
# execution. The function specializes to the precise shape of the argument
# tensors. To avoid re-tracing due to the variable sequence lengths or variable
# batch sizes (the last batch is smaller), use input_signature to specify
# more generic shapes.
train_step_signature = [
tf.TensorSpec(shape=(None, None, None), dtype=tf.float32),
tf.TensorSpec(shape=(None, None), dtype=tf.float32),
]
@tf.function(input_signature=train_step_signature)
def train_step(inp, tar):
with tf.GradientTape() as tape:
predictions = transformer(inp, training=True)
loss_ = loss_object(tar, predictions)
loss = tf.reduce_sum(loss_)
gradients = tape.gradient(loss, transformer.trainable_variables)
optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))
train_loss(loss_)
@tf.function(input_signature=train_step_signature)
def val_step(inp, tar):
with tf.GradientTape():
predictions = transformer(inp, training=False)
loss_ = loss_object(tar, predictions)
val_loss(loss_)
学習¶
In [27]:
for epoch in range(epochs):
start = time.time()
train_loss.reset_states()
val_loss.reset_states()
for batch, (inp, tar) in enumerate(train_batches):
train_step(inp, tar)
if (batch + 1) % 50 == 0:
print(f"Epoch {epoch + 1} Batch {batch + 1} Loss {train_loss.result():.4f}")
for batch, (inp, tar) in enumerate(test_batches):
val_step(inp, tar)
# 遅いので毎エポック保存
ckpt_save_path = ckpt_manager.save()
print(f"Saving checkpoint for epoch {epoch+1} at {ckpt_save_path}")
print(
f"Epoch {epoch + 1} Train Loss {train_loss.result():.4f} Val Loss {val_loss.result():.4f}"
)
print(f"Time taken for 1 epoch: {time.time() - start:.2f} secs\n")
テストデータの実測値と予測値の比較¶
In [28]:
def plot_price_and_prediction(ticker, actual_price, predicted_price, ax):
ax.xaxis.set_major_locator(mdates.AutoDateLocator())
ax.xaxis.set_major_formatter(mdates.DateFormatter("%y-%m-%d"))
plt.setp(ax.get_xticklabels(), rotation=45, ha="right")
ax.plot(actual_price, label="Actual")
ax.plot(predicted_price, label="Prediction")
ax.grid()
ax.set_title(ticker)
ax.legend()
fig = plt.figure(figsize=(12, 8))
for i, ticker in enumerate(tickers):
test_data = tf.convert_to_tensor(X_test_all[ticker])
predictions = transformer(test_data, training=False)
# スケールを戻す
unscaled_prediction = data_all[ticker]["test_scaler"].inverse_transform(
np.hstack((predictions, np.zeros((predictions.shape[0], num_features))))
)[:, 0]
scaled_test_data = data_all[ticker]["scaled_test_data"]
actual_price = data_all[ticker]["data"]["Close"][scaled_test_data.index]
predicted_price = pd.Series(
unscaled_prediction, index=actual_price.index[seq_length:]
)
predicted_price
# グラフ描画
ax = fig.add_subplot(grid_size, grid_size, i + 1)
plot_price_and_prediction(ticker, actual_price, predicted_price, ax)
# グラフ間の間隔を調整
plt.subplots_adjust(hspace=1.0, wspace=0.3)
In [ ]: