PythonとTensorFlowでAIを学ぶ (自然言語翻訳)
TensorFlowの公式チュートリアルを参考にしています。
スペイン語から英語への翻訳を行うモデルを構築します。
%pip install tensorflow_text
%pip install einops
%pip install matplotlib
import numpy as np
import typing
from typing import Any, Tuple
import tensorflow as tf
import tensorflow_text as tf_text
import einops
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from tensorflow.python.client import device_lib
import pathlib
print(tf.__version__)
display(device_lib.list_local_devices())
display(tf.config.list_physical_devices("GPU"))
ShapeChecker クラスの説明¶
このチュートリアルでは、低レベルのAPIを多用するため、テンソルの形状を間違えやすくなっています。ShapeChecker
クラスは、チュートリアル全体を通してテンソルの形状を確認するために使用されます。
機能:¶
- 形状の確認:与えられたテンソルの形状が期待される形状と一致するかをチェックします。
- キャッシュの利用:一度見た変数の形状をキャッシュし、後続のチェックで利用します。
- ブロードキャストの考慮:ブロードキャスト(形状の自動調整)が行われる場合、それを考慮に入れます。
メソッド:¶
__init__
: クラスの初期化メソッド。軸名とその形状のキャッシュを保持するための辞書を作成します。__call__
: クラスの呼び出しメソッド。指定されたテンソルと軸名を受け取り、それらの形状をチェックします。
使用方法:¶
- テンソルと、そのテンソルの各軸に名前を割り当てます(例:
"batch time"
)。 - このクラスを呼び出し、テンソルと軸名を渡します。
- テンソルの形状が以前に保存された形状と一致しない場合は、
ValueError
を発生させます。
このクラスは、形状の不一致によるエラーを早期に発見するのに役立ちます。
class ShapeChecker():
def __init__(self):
# Keep a cache of every axis-name seen
self.shapes = {}
def __call__(self, tensor: tf.Tensor, names: str, broadcast: bool = False):
if not tf.executing_eagerly():
return
parsed = einops.parse_shape(tensor, names)
for name, new_dim in parsed.items():
old_dim = self.shapes.get(name, None)
if (broadcast and new_dim == 1):
continue
if old_dim is None:
# If the axis name is new, add its length to the cache
self.shapes[name] = new_dim
continue
if new_dim != old_dim:
raise ValueError(f"Shape mismatch for dimension: '{name}'\n"
f" found: {new_dim}"
f" expected: {old_dim}")
データセットのダウンロード¶
# Download the file
path_to_zip = tf.keras.utils.get_file(
'spa-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip',
extract=True)
path_to_file = pathlib.Path(path_to_zip).parent / "spa-eng/spa.txt"
def load_data(path: str) -> Tuple[np.ndarray, np.ndarray]:
"""
指定したパスからデータを読み込みます。
Args:
path: データが保存されているファイルのパス。
Returns:
Tuple[np.ndarray, np.ndarray]: 入力データと出力データのペア。
"""
text = path.read_text(encoding='utf-8')
lines = text.splitlines()
pairs = [line.split('\t') for line in lines]
# ペアは(英語の文章、スペイン語の文章)という形式になっているらしい。
# ターゲットが左側にあることに違和感がある。
context = np.array([context for target, context in pairs])
target = np.array([target for target, context in pairs])
return target, context
target_raw, context_raw = load_data(path_to_file)
print(context_raw[-1])
print(target_raw[-1])
tf.dataデータセットを作成¶
BUFFER_SIZE = len(context_raw)
BATCH_SIZE = 64
is_train = np.random.uniform(size=(len(target_raw),)) < 0.8
train_raw = (
tf.data.Dataset
.from_tensor_slices((context_raw[is_train], target_raw[is_train]))
.shuffle(BUFFER_SIZE)
.batch(BATCH_SIZE))
val_raw = (
tf.data.Dataset
.from_tensor_slices((context_raw[~is_train], target_raw[~is_train]))
.shuffle(BUFFER_SIZE)
.batch(BATCH_SIZE))
for example_context_strings, example_target_strings in train_raw.take(1):
print(example_context_strings[:5])
print()
print(example_target_strings[:5])
break
example_text = tf.constant('¿Todavía está en casa?')
print(example_text.numpy())
print(example_text.numpy().decode())
print(tf_text.normalize_utf8(example_text, 'NFKD').numpy())
print(tf_text.normalize_utf8(example_text, 'NFKD').numpy().decode())
テキストの前処理を行う関数¶
def tf_lower_and_split_punct(text: tf.Tensor) -> tf.Tensor:
"""
与えられたテキストの前処理を行います。
この関数は以下の処理を行います:
1. アクセント付きの文字を分割するために、NFKD正規化を使用します。
2. テキストを全て小文字に変換します。
3. スペース、アルファベット、一部の句読点 ( . , ! ? ¿ ) を除き、他の全ての文字を削除します。
4. 句読点 ( . ? ! , ¿ ) の前後にスペースを挿入します。
5. 連続するスペースを一つにまとめます。
6. テキストの前後に '[START]' と '[END]' トークンを追加します。
Args:
text (tf.Tensor): 前処理を行うテキストが格納されたテンソル。
Returns:
tf.Tensor: 前処理が行われたテキストが格納されたテンソル。
"""
# Split accented characters.
text = tf_text.normalize_utf8(text, 'NFKD')
text = tf.strings.lower(text)
# Keep space, a to z, and select punctuation.
text = tf.strings.regex_replace(text, '[^ a-z.?!,¿]', '')
# Add spaces around punctuation.
text = tf.strings.regex_replace(text, '[.?!,¿]', r' \0 ')
# Strip white space.
text = tf.strings.strip(text)
text = tf.strings.join(['[START]', text, '[END]'], separator=' ')
return text
print(example_text.numpy().decode())
print(tf_lower_and_split_punct(example_text).numpy().decode())
テキストのトークン化¶
このコードは、TensorFlowの tf.keras.layers.TextVectorization
レイヤーを使用して、テキストデータの前処理とトークン化を行うための設定をしています。具体的には以下の処理を行います:
テキストの標準化 (Standardization):
tf_lower_and_split_punct
関数を使用して、テキストデータを標準化します。
語彙の抽出とトークン化 (Vocabulary Extraction and Tokenization):
- このレイヤーは入力テキストから語彙(単語のセット)を抽出し、各単語をトークン(一意の数値ID)に変換します。これにより、テキストデータはニューラルネットワークが処理できる数値のシーケンスに変換されます。
最大語彙サイズの設定 (Setting the Maximum Vocabulary Size):
max_vocab_size = 5000
は、語彙に含める単語の最大数を 5000 に制限することを意味します。これにより、トレーニングの計算コストを抑えつつ、出現頻度が高い単語をカバーすることができます。
ラギッドテンソル (Ragged Tensor):
ragged=True
は、出力をラギッドテンソル(不規則な長さのテンソル)の形式で生成することを意味します。テキストデータは通常、異なる長さを持つため、このオプションにより各サンプルの長さを個別に扱うことができます。
TextVectorization
レイヤーは、ニューラルネットワークが処理できる形式にテキストデータを自動的に変換するための強力なツールです。これにより、テキストデータの前処理とトークン化のプロセスが簡素化され、機械学習モデルのトレーニングが容易になります。
max_vocab_size = 5000
context_text_processor = tf.keras.layers.TextVectorization(
standardize=tf_lower_and_split_punct,
max_tokens=max_vocab_size,
ragged=True)
context_text_processor.adapt(train_raw.map(lambda context, target: context))
# 先頭10個の語彙を表示。
context_text_processor.get_vocabulary()[:10]
# ターゲット用のベクトル化レイヤー。
target_text_processor = tf.keras.layers.TextVectorization(
standardize=tf_lower_and_split_punct,
max_tokens=max_vocab_size,
ragged=True)
target_text_processor.adapt(train_raw.map(lambda context, target: target))
target_text_processor.get_vocabulary()[:10]
# これらの層は、文字列のバッチをトークンID配列のバッチに変換できます。
example_tokens = context_text_processor(example_context_strings)
example_tokens[:3, :]
トークンからテキストへの逆変換¶
context_vocab = np.array(context_text_processor.get_vocabulary())
tokens = context_vocab[example_tokens[0].numpy()]
' '.join(tokens)
トークンIDとマスクの可視化¶
マスクは、テキスト長を揃えるために使用されます。
plt.subplot(1, 2, 1)
plt.pcolormesh(example_tokens.to_tensor())
plt.title('Token IDs')
plt.subplot(1, 2, 2)
plt.pcolormesh(example_tokens.to_tensor() != 0)
_ = plt.title('Mask')
データセットの処理¶
以下の process_text
関数は、文字列のデータセットをトークンIDのテンソルに変換し、0でパディングします。さらに、(context, target)のペアを、Kerasの Model.fit
でのトレーニングに適した ((context, target_in), target_out) のペアに変換します。
Kerasは入力として(inputs, labels)のペアを期待しています。ここでの入力は(context, target_in)であり、ラベルはtarget_outです。target_inとtarget_outの違いは、相互に対して一つのステップずれていることです。これにより、各位置でのラベルが次のトークンになります。
def process_text(context: tf.Tensor, target: tf.Tensor) -> Tuple[Tuple[tf.Tensor, tf.Tensor], tf.Tensor]:
"""
与えられたコンテキストとターゲットを、`Model.fit` で使用できる形式に変換します。
この関数は以下の処理を行います:
1. コンテキストをトークン化し、固定長のテンソルに変換します。
2. ターゲットをトークン化します。
3. ターゲットテンソルの最後のトークンを除いたものを `targ_in` とし、最初のトークンを除いたものを `targ_out` とします。
4. `(context, targ_in)` と `targ_out` のタプルを返します。
Args:
context (tf.Tensor): コンテキストが格納されたテンソル。
target (tf.Tensor): ターゲットが格納されたテンソル。
Returns:
Tuple[Tuple[tf.Tensor, tf.Tensor], tf.Tensor]: `Model.fit` で使用できる形式のタプル。
"""
context = context_text_processor(context).to_tensor()
target = target_text_processor(target)
targ_in = target[:,:-1].to_tensor() # 末尾以外のトークン
targ_out = target[:,1:].to_tensor() # 先頭以外のトークン
return (context, targ_in), targ_out
train_ds = train_raw.map(process_text, tf.data.AUTOTUNE)
val_ds = train_raw.map(process_text, tf.data.AUTOTUNE)
for (ex_context_tok, ex_tar_in), ex_tar_out in train_ds.take(1):
print(ex_context_tok[0, :10].numpy())
print()
print(ex_tar_in[0, :10].numpy())
print(ex_tar_out[0, :10].numpy())
埋め込みベクトルの次元数¶
UNITS = 256
エンコーダ¶
class Encoder(tf.keras.layers.Layer):
def __init__(self, text_processor: tf.keras.layers.TextVectorization, units: int):
super(Encoder, self).__init__()
self.text_processor = text_processor
self.vocab_size = text_processor.vocabulary_size()
self.units = units
# 組み込みレイヤは、トークンIDをベクトルに変換します。
self.embedding = tf.keras.layers.Embedding(self.vocab_size, self.units,
mask_zero=True)
# RNNは、ベクトルのシーケンスを処理します。
self.rnn = tf.keras.layers.Bidirectional(
merge_mode='sum',
layer=tf.keras.layers.GRU(units,
return_sequences=True,
recurrent_initializer='glorot_uniform'))
def call(self, x: tf.Tensor) -> tf.Tensor:
shape_checker = ShapeChecker()
shape_checker(x, 'batch s')
# トークンIDをベクトルに変換します。
x = self.embedding(x)
shape_checker(x, 'batch s units')
# GRUは、ベクトルのシーケンスを処理します。
x = self.rnn(x)
shape_checker(x, 'batch s units')
# 新しいベクトルのシーケンスを返します。
return x
def convert_input(self, texts: tf.RaggedTensor) -> tf.Tensor:
texts = tf.convert_to_tensor(texts)
if len(texts.shape) == 0:
texts = tf.convert_to_tensor(texts)[tf.newaxis]
context = self.text_processor(texts).to_tensor()
context = self(context)
return context
# Encode the input sequence.
encoder = Encoder(context_text_processor, UNITS)
ex_context = encoder(ex_context_tok)
print(f'Context tokens, shape (batch, s): {ex_context_tok.shape}')
print(f'Encoder output, shape (batch, s, units): {ex_context.shape}')
アテンション層(CrossAttention クラス)¶
アテンション層は、デコーダにエンコーダからの情報へのアクセスを可能にする層です。この層は、コンテキストシーケンス全体から単一のベクトルを計算し、それをデコーダの出力に追加します。
機能:¶
- 加重平均の計算:
- シーケンス全体から単一のベクトルを計算する最も単純な方法は、シーケンス全体の平均を取ることです。アテンション層はこれに似ていますが、コンテキストシーケンス全体にわたる加重平均を計算します。重みはコンテキストベクトルと「クエリ」ベクトルの組み合わせから計算されます。
コンポーネント:¶
MultiHeadAttention:
- クエリ(x)とコンテキスト(context)の間のアテンションを計算します。この例では、
key_dim=units
とnum_heads=1
で設定されています。
- クエリ(x)とコンテキスト(context)の間のアテンションを計算します。この例では、
LayerNormalization:
- アテンションの出力と元の入力を組み合わせた後に、レイヤー正規化を行います。
Add:
- アテンションの出力と元の入力を加算します。
メソッド:¶
call
:- 入力
x
(クエリ)とcontext
(コンテキスト)を受け取り、アテンションを適用します。 mha
を用いてアテンションの出力とスコアを計算します。- アテンションスコアは平均化され、
last_attention_weights
に保存されます。 - 出力は
add
とlayernorm
を通して最終的な結果に加工されます。
- 入力
このアテンション層により、デコーダはコンテキストデータの重要な部分に焦点を合わせ、より効果的な学習と予測が可能になります。
class CrossAttention(tf.keras.layers.Layer):
def __init__(self, units: int, **kwargs: Any):
super().__init__()
self.mha = tf.keras.layers.MultiHeadAttention(key_dim=units, num_heads=1, **kwargs)
self.layernorm = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.add = tf.keras.layers.Add()
def call(self, x: tf.Tensor, context: tf.Tensor):
shape_checker = ShapeChecker()
shape_checker(x, 'batch t units')
shape_checker(context, 'batch s units')
attn_output, attn_scores = self.mha(
query=x,
value=context,
return_attention_scores=True)
shape_checker(x, 'batch t units')
shape_checker(attn_scores, 'batch heads t s')
# 後にプロットするために注意の重みを保持します。
attn_scores = tf.reduce_mean(attn_scores, axis=1)
shape_checker(attn_scores, 'batch t s')
self.last_attention_weights = attn_scores
x = self.add([x, attn_output])
x = self.layernorm(x)
return x
attention_layer = CrossAttention(UNITS)
#
embed = tf.keras.layers.Embedding(target_text_processor.vocabulary_size(),
output_dim=UNITS, mask_zero=True)
ex_tar_embed = embed(ex_tar_in)
result = attention_layer(ex_tar_embed, ex_context)
print(f'Context sequence, shape (batch, s, units): {ex_context.shape}')
print(f'Target sequence, shape (batch, t, units) : {ex_tar_embed.shape}')
print(f'Attention result, shape (batch, t, units): {result.shape}')
print(f'Attention weights, shape (batch, t, s) : {attention_layer.last_attention_weights.shape}')
# 確率分布なのでValueごとの確率を合計すると1になる?
attention_layer.last_attention_weights[0].numpy().sum(axis=-1)
attention_weights = attention_layer.last_attention_weights
mask = (ex_context_tok != 0).numpy()
plt.subplot(1, 2, 1)
plt.pcolormesh(mask * attention_weights[:, 0, :])
plt.title('Attention weights')
plt.subplot(1, 2, 2)
plt.pcolormesh(mask)
plt.title('Mask')
デコーダ¶
機能:¶
- ターゲットシーケンスの処理:
- ターゲットシーケンス内の各トークンに対して埋め込みを検索します。
- RNNの使用:
- RNN(ここではGRU)を使用してターゲットシーケンスを処理し、これまでに生成した内容を追跡します。
- アテンション層との連携:
- RNNの出力をアテンション層の「クエリ」として使用し、エンコーダの出力に注目します。
- 次のトークンの予測:
- 出力シーケンスの各位置で次のトークンを予測します。
コンポーネント:¶
- テキストプロセッサ:
TextVectorization
レイヤを使用して、テキストをトークンIDに変換します。
- 単語とIDの変換レイヤ:
StringLookup
レイヤを使用して、単語をIDに、IDを単語に変換します。
- 埋め込みレイヤ (Embedding):
- トークンIDを密なベクトルに変換します。
- RNNレイヤ (GRU):
- シーケンスデータの処理に使用され、シーケンス全体の出力と最終状態を返します。
- アテンションレイヤ (CrossAttention):
- エンコーダの出力に対するアテンションを計算します。
- 出力レイヤ (Dense):
- RNNの出力を語彙サイズのlogitsに変換します。
モデルの動作:¶
- トレーニング時:
- モデルは各位置で次の単語を予測します。情報が一方向にのみ流れるようにすることが重要です。そのため、デコーダはターゲットシーケンスを処理するために単方向(非双方向)のRNNを使用します。
- 推論時:
- モデルは一度に一つの単語を生成し、それをモデルにフィードバックします。
class Decoder(tf.keras.layers.Layer):
@classmethod
def add_method(cls, fun):
"""
後からメソッドを追加するためのデコレータです。
Args:
fun: 追加するメソッド。
Returns:
Any: 追加されたメソッド。
"""
setattr(cls, fun.__name__, fun)
return fun
def __init__(self, text_processor, units):
super(Decoder, self).__init__()
self.text_processor = text_processor
self.vocab_size = text_processor.vocabulary_size()
self.word_to_id = tf.keras.layers.StringLookup(
vocabulary=text_processor.get_vocabulary(),
mask_token='', oov_token='[UNK]')
self.id_to_word = tf.keras.layers.StringLookup(
vocabulary=text_processor.get_vocabulary(),
mask_token='', oov_token='[UNK]',
invert=True)
self.start_token = self.word_to_id('[START]')
self.end_token = self.word_to_id('[END]')
self.units = units
# 1. The embedding layer converts token IDs to vectors
self.embedding = tf.keras.layers.Embedding(self.vocab_size,
units, mask_zero=True)
# 2. The RNN keeps track of what's been generated so far.
# どうやらGPUを使うと、return_state=Trueで不具合が起きるらしい。
# 代わりに、return_sequences=Trueで返る末尾の値を使えばいいのでは?
self.rnn = tf.keras.layers.GRU(units,
return_sequences=True,
return_state=False, # TrueだとGPUでエラーが起きる。
recurrent_initializer='glorot_uniform')
# 3. The RNN output will be the query for the attention layer.
self.attention = CrossAttention(units)
# 4. This fully connected layer produces the logits for each
# output token.
self.output_layer = tf.keras.layers.Dense(self.vocab_size)
@Decoder.add_method
def call(self,
context, x,
state=None,
return_state=False):
shape_checker = ShapeChecker()
shape_checker(x, 'batch t')
shape_checker(context, 'batch s units')
# 1. Lookup the embeddings
x = self.embedding(x)
shape_checker(x, 'batch t units')
# 2. Process the target sequence.
# GPUだとここでなぜかエラーになる。
# x, state = self.rnn(x, initial_state=state)
x = self.rnn(x, initial_state=state)
# 末尾の要素を取得する。
state = x[:, -1, :]
shape_checker(x, 'batch t units')
# 3. Use the RNN output as the query for the attention over the context.
x = self.attention(x, context)
self.last_attention_weights = self.attention.last_attention_weights
shape_checker(x, 'batch t units')
shape_checker(self.last_attention_weights, 'batch t s')
# Step 4. Generate logit predictions for the next token.
logits = self.output_layer(x)
shape_checker(logits, 'batch t target_vocab_size')
if return_state:
return logits, state
else:
return logits
decoder = Decoder(target_text_processor, UNITS)
logits = decoder(ex_context, ex_tar_in)
print(f'encoder output shape: (batch, s, units) : {ex_context.shape}')
print(f'input target tokens shape: (batch, t) : {ex_tar_in.shape}')
print(f'logits shape shape: (batch, target_vocabulary_size): {logits.shape}')
@Decoder.add_method
def get_initial_state(self, context):
batch_size = tf.shape(context)[0]
start_tokens = tf.fill([batch_size, 1], self.start_token)
done = tf.zeros([batch_size, 1], dtype=tf.bool)
# embedded = self.embedding(start_tokens)
return start_tokens, done, self.rnn.get_initial_state(batch_size=batch_size)[0]
@Decoder.add_method
def tokens_to_text(self, tokens: tf.Tensor):
words = self.id_to_word(tokens)
result = tf.strings.reduce_join(words, axis=-1, separator=' ')
result = tf.strings.regex_replace(result, '^ *\[START\] *', '')
result = tf.strings.regex_replace(result, ' *\[END\] *$', '')
return result
@Decoder.add_method
def get_next_token(self, context: tf.Tensor, next_token: tf.Tensor, done: tf.Tensor, state: tf.Tensor, temperature: float = 0.0) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
"""
次のトークンを生成し、デコーダの状態を更新します。
Args:
context (tf.Tensor): デコーダへの入力コンテキスト。
next_token (tf.Tensor): 生成する次のトークン。
done (tf.Tensor): すでに終了トークンが生成されたかどうかを示すブールテンソル。
state (tf.Tensor): デコーダの現在の状態。
temperature (float): 出力のランダム性を調整するための温度パラメータ。デフォルトは0.0 (最も確信度の高いトークンを選択)。
Returns:
Tuple[tf.Tensor, tf.Tensor, tf.Tensor]: 生成された次のトークン、終了フラグ、更新された状態。
"""
logits, state = self(
context, next_token,
state = state,
return_state = True)
if temperature == 0.0:
next_token = tf.argmax(logits, axis=-1)
else:
logits = logits[:, -1, :] / temperature
next_token = tf.random.categorical(logits, num_samples=1)
# 終了トークンの場合、終了フラグを設定します。
done = done | (next_token == self.end_token)
next_token = tf.where(done, tf.constant(0, dtype=tf.int64), next_token)
return next_token, done, state
next_token, done, state = decoder.get_initial_state(ex_context)
tokens = []
for n in range(10):
# 1ステップ走らせます。
next_token, done, state = decoder.get_next_token(
ex_context, next_token, done, state, temperature=1.0)
tokens.append(next_token)
# 全てのトークンを連結。
tokens = tf.concat(tokens, axis=-1) # (batch, t)
# トークンをテキストに変換。
result = decoder.tokens_to_text(tokens)
result[:3].numpy()
モデル¶
class Translator(tf.keras.Model):
@classmethod
def add_method(cls, fun):
setattr(cls, fun.__name__, fun)
return fun
def __init__(self, units,
context_text_processor,
target_text_processor):
super().__init__()
# Build the encoder and decoder
encoder = Encoder(context_text_processor, units)
decoder = Decoder(target_text_processor, units)
self.encoder = encoder
self.decoder = decoder
def call(self, inputs):
context, x = inputs
context = self.encoder(context)
logits = self.decoder(context, x)
#TODO(b/250038731): remove this
try:
# Delete the keras mask, so keras doesn't scale the loss+accuracy.
del logits._keras_mask
except AttributeError:
pass
return logits
model = Translator(UNITS, context_text_processor, target_text_processor)
logits = model((ex_context_tok, ex_tar_in))
print(f'Context tokens, shape: (batch, s, units) : {ex_context_tok.shape}')
print(f'Target tokens, shape: (batch, t) : {ex_tar_in.shape}')
print(f'logits, shape: (batch, t, target_vocabulary_size): {logits.shape}')
def masked_loss(y_true, y_pred):
# Calculate the loss for each item in the batch.
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction='none')
loss = loss_fn(y_true, y_pred)
# Mask off the losses on padding.
mask = tf.cast(y_true != 0, loss.dtype)
loss *= mask
# Return the total.
return tf.reduce_sum(loss)/tf.reduce_sum(mask)
def masked_acc(y_true, y_pred):
# Calculate the loss for each item in the batch.
y_pred = tf.argmax(y_pred, axis=-1)
y_pred = tf.cast(y_pred, y_true.dtype)
match = tf.cast(y_true == y_pred, tf.float32)
mask = tf.cast(y_true != 0, tf.float32)
return tf.reduce_sum(match)/tf.reduce_sum(mask)
model.compile(optimizer='adam',
loss=masked_loss,
metrics=[masked_acc, masked_loss])
vocab_size = 1.0 * target_text_processor.vocabulary_size()
{"expected_loss": tf.math.log(vocab_size).numpy(),
"expected_acc": 1/vocab_size}
model.evaluate(val_ds, steps=20, return_dict=True)
学習¶
history = model.fit(
train_ds.repeat(),
epochs=100,
steps_per_epoch = 100,
validation_data=val_ds,
validation_steps = 20,
callbacks=[
tf.keras.callbacks.EarlyStopping(patience=3)])
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch #')
plt.ylabel('CE/token')
plt.legend()
plt.plot(history.history['masked_acc'], label='accuracy')
plt.plot(history.history['val_masked_acc'], label='val_accuracy')
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch #')
plt.ylabel('CE/token')
plt.legend()
@Translator.add_method
def translate(self,
texts, *,
max_length=50,
temperature=0.0):
# Process the input texts
context = self.encoder.convert_input(texts)
batch_size = tf.shape(texts)[0]
# Setup the loop inputs
tokens = []
attention_weights = []
next_token, done, state = self.decoder.get_initial_state(context)
for _ in range(max_length):
# Generate the next token
next_token, done, state = self.decoder.get_next_token(
context, next_token, done, state, temperature)
# Collect the generated tokens
tokens.append(next_token)
attention_weights.append(self.decoder.last_attention_weights)
if tf.executing_eagerly() and tf.reduce_all(done):
break
# Stack the lists of tokens and attention weights.
tokens = tf.concat(tokens, axis=-1) # t*[(batch 1)] -> (batch, t)
self.last_attention_weights = tf.concat(attention_weights, axis=1) # t*[(batch 1 s)] -> (batch, t s)
result = self.decoder.tokens_to_text(tokens)
return result
result = model.translate(['¿Todavía está en casa?']) # Are you still home
result[0].numpy().decode()
@Translator.add_method
def plot_attention(self, text, **kwargs):
assert isinstance(text, str)
output = self.translate([text], **kwargs)
output = output[0].numpy().decode()
attention = self.last_attention_weights[0]
context = tf_lower_and_split_punct(text)
context = context.numpy().decode().split()
output = tf_lower_and_split_punct(output)
output = output.numpy().decode().split()[1:]
fig = plt.figure(figsize=(10, 10))
ax = fig.add_subplot(1, 1, 1)
ax.matshow(attention, cmap='viridis', vmin=0.0)
fontdict = {'fontsize': 14}
ax.set_xticklabels([''] + context, fontdict=fontdict, rotation=90)
ax.set_yticklabels([''] + output, fontdict=fontdict)
ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
ax.yaxis.set_major_locator(ticker.MultipleLocator(1))
ax.set_xlabel('Input text')
ax.set_ylabel('Output text')
model.plot_attention('¿Todavía está en casa?') # Are you still home
long_text = context_raw[-1]
import textwrap
print('Expected output:\n', '\n'.join(textwrap.wrap(target_raw[-1])))
model.plot_attention(long_text)
inputs = [
'Hace mucho frio aqui.', # "It's really cold here."
'Esta es mi vida.', # "This is my life."
'Su cuarto es un desastre.' # "His room is a mess"
]
複数の文章を翻訳する場合、まとめて翻訳した方が速い¶
%%time
for t in inputs:
print(model.translate([t])[0].numpy().decode())
print()
%%time
result = model.translate(inputs)
print(result[0].numpy().decode())
print(result[1].numpy().decode())
print(result[2].numpy().decode())
print()
エクスポート可能にする¶
class Export(tf.Module):
def __init__(self, model):
self.model = model
@tf.function(input_signature=[tf.TensorSpec(dtype=tf.string, shape=[None])])
def translate(self, inputs):
return self.model.translate(inputs)
export = Export(model)
%%time
_ = export.translate(tf.constant(inputs))
%%time
result = export.translate(tf.constant(inputs))
print(result[0].numpy().decode())
print(result[1].numpy().decode())
print(result[2].numpy().decode())
print()
より効率的な実装¶
@Translator.add_method
def translate(self, texts, *, max_length=500, temperature=tf.constant(0.0)):
shape_checker = ShapeChecker()
context = self.encoder.convert_input(texts)
batch_size = tf.shape(context)[0]
shape_checker(context, 'batch s units')
next_token, done, state = self.decoder.get_initial_state(context)
# initialize the accumulator
tokens = tf.TensorArray(tf.int64, size=1, dynamic_size=True)
for t in tf.range(max_length):
# Generate the next token
next_token, done, state = self.decoder.get_next_token(
context, next_token, done, state, temperature)
shape_checker(next_token, 'batch t1')
# Collect the generated tokens
tokens = tokens.write(t, next_token)
# if all the sequences are done, break
if tf.reduce_all(done):
break
# Convert the list of generated token ids to a list of strings.
tokens = tokens.stack()
shape_checker(tokens, 't batch t1')
tokens = einops.rearrange(tokens, 't batch 1 -> batch t')
shape_checker(tokens, 'batch t')
text = self.decoder.tokens_to_text(tokens)
shape_checker(text, 'batch')
return text
%%time
result = model.translate(inputs)
print(result[0].numpy().decode())
print(result[1].numpy().decode())
print(result[2].numpy().decode())
print()
tf.Function
を使ったときに速度に差が出る
# このクラスの実装は前のものと同じ。
class Export(tf.Module):
def __init__(self, model):
self.model = model
@tf.function(input_signature=[tf.TensorSpec(dtype=tf.string, shape=[None])])
def translate(self, inputs):
return self.model.translate(inputs)
export = Export(model)
%%time
_ = export.translate(inputs)
%%time
result = export.translate(tf.constant(inputs))
print(result[0].numpy().decode())
print(result[1].numpy().decode())
print(result[2].numpy().decode())
print()
モデルの保存¶
%%time
tf.saved_model.save(export, 'dynamic_translator',
signatures={'serving_default': export.translate})
モデルのロード¶
%%time
reloaded = tf.saved_model.load('dynamic_translator')
_ = reloaded.translate(tf.constant(inputs)) # warmup
%%time
result = reloaded.translate(tf.constant(inputs))
print(result[0].numpy().decode())
print(result[1].numpy().decode())
print(result[2].numpy().decode())
print()