"""
This script contains:
* MPO:
+ UMPO
"""
import warnings
from typing import (List, Optional, Sequence,
Text, Tuple, Union)
from math import sqrt
import torch
import tensorkrowch.operations as op
from tensorkrowch.components import AbstractNode, Node, ParamNode
from tensorkrowch.components import TensorNetwork
from tensorkrowch.models import MPSData
[docs]class MPO(TensorNetwork): # MARK: MPO
"""
Class for Matrix Product Operators. This is the base class from which
:class:`UMPO` inherits.
Matrix Product Operators are formed by:
* ``mats_env``: Environment of `matrix` nodes with axes
``("left", "input", "right", "output")``.
* ``left_node``, ``right_node``: `Vector` nodes with axes ``("right",)``
and ``("left",)``, respectively. These are used to close the boundary
in the case ``boudary`` is ``"obc"``. Otherwise, both are ``None``.
In contrast with :class:`MPS`, in ``MPO`` all nodes act both as input and
output, with corresponding edges dedicated to that. Thus, data nodes will
be connected to the ``"input"`` edge of all nodes. Upon contraction of the
whole network, a resultant tensor will be formed, with as many dimensions
as nodes were in the MPO.
If all nodes have the same input dimensions, the input data tensor can be
passed as a single tensor. Otherwise, it would have to be passed as a list
of tensors with different sizes.
Parameters
----------
n_features : int, optional
Number of nodes that will be in ``mats_env``. That is, number of nodes
without taking into account ``left_node`` and ``right_node``.
in_dim : int, list[int] or tuple[int], optional
Input dimension(s). If given as a sequence, its length should be equal
to ``n_features``.
out_dim : int, list[int] or tuple[int], optional
Output dimension(s). If given as a sequence, its length should be equal
to ``n_features``.
bond_dim : int, list[int] or tuple[int], optional
Bond dimension(s). If given as a sequence, its length should be equal
to ``n_features`` (if ``boundary = "pbc"``) or ``n_features - 1`` (if
``boundary = "obc"``). The i-th bond dimension is always the dimension
of the right edge of the i-th node.
boundary : {"obc", "pbc"}
String indicating whether periodic or open boundary conditions should
be used.
tensors: list[torch.Tensor] or tuple[torch.Tensor], optional
Instead of providing ``n_features``, ``in_dim``, ``in_dim``, ``bond_dim``
and ``boundary``, a list of MPO tensors can be provided. In such case,
all mentioned attributes will be inferred from the given tensors. All
tensors should be rank-4 tensors, with shape ``(bond_dim, in_dim,
bond_dim, out_dim)``. If the first and last elements are rank-3 tensors,
with shapes ``(in_dim, bond_dim, out_dim)``, ``(bond_dim, in_dim, out_dim)``,
respectively, the inferred boundary conditions will be "obc". Also, if
``tensors`` contains a single element, it can be rank-2 ("obc") or
rank-4 ("pbc").
n_batches : int
Number of batch edges of input ``data`` nodes. Usually ``n_batches = 1``
(where the batch edge is used for the data batched) but it could also
be ``n_batches = 2`` (one edge for data batched, other edge for image
patches in convolutional layers).
init_method : {"zeros", "ones", "copy", "rand", "randn"}, optional
Initialization method. Check :meth:`initialize` for a more detailed
explanation of the different initialization methods.
device : torch.device, optional
Device where to initialize the tensors if ``init_method`` is provided.
dtype : torch.dtype, optional
Dtype of the tensor if ``init_method`` is provided.
kwargs : float
Keyword arguments for the different initialization methods. See
:meth:`~tensorkrowch.AbstractNode.make_tensor`.
Examples
--------
``MPO`` with same input/output dimensions:
>>> mpo = tk.models.MPO(n_features=5,
... in_dim=2,
... out_dim=2,
... bond_dim=5)
>>> data = torch.ones(20, 5, 2) # batch_size x n_features x feature_size
>>> result = mpo(data)
>>> result.shape
torch.Size([20, 2, 2, 2, 2, 2])
``MPO`` with different input/physical dimensions:
>>> mpo = tk.models.MPO(n_features=5,
... in_dim=list(range(2, 7)),
... out_dim=list(range(7, 2, -1)),
... bond_dim=5)
>>> data = [torch.ones(20, i)
... for i in range(2, 7)] # n_features * [batch_size x feature_size]
>>> result = mpo(data)
>>> result.shape
torch.Size([20, 7, 6, 5, 4, 3])
"""
def __init__(self,
n_features: Optional[int] = None,
in_dim: Optional[Union[int, Sequence[int]]] = None,
out_dim: Optional[Union[int, Sequence[int]]] = None,
bond_dim: Optional[Union[int, Sequence[int]]] = None,
boundary: Text = 'obc',
tensors: Optional[Sequence[torch.Tensor]] = None,
n_batches: int = 1,
init_method: Text = 'randn',
device: Optional[torch.device] = None,
dtype: Optional[torch.dtype] = None,
**kwargs) -> None:
super().__init__(name='mpo')
if tensors is None:
# boundary
if boundary not in ['obc', 'pbc']:
raise ValueError('`boundary` should be one of "obc" or "pbc"')
self._boundary = boundary
# n_features
if not isinstance(n_features, int):
raise TypeError('`n_features` should be int type')
elif n_features < 1:
raise ValueError('`n_features` should be at least 1')
self._n_features = n_features
# in_dim
if isinstance(in_dim, (list, tuple)):
if len(in_dim) != n_features:
raise ValueError('If `in_dim` is given as a sequence of int, '
'its length should be equal to `n_features`')
self._in_dim = list(in_dim)
elif isinstance(in_dim, int):
self._in_dim = [in_dim] * n_features
else:
raise TypeError('`in_dim` should be int, tuple[int] or list[int] '
'type')
# out_dim
if isinstance(out_dim, (list, tuple)):
if len(out_dim) != n_features:
raise ValueError('If `out_dim` is given as a sequence of int, '
'its length should be equal to `n_features`')
self._out_dim = list(out_dim)
elif isinstance(out_dim, int):
self._out_dim = [out_dim] * n_features
else:
raise TypeError('`out_dim` should be int, tuple[int] or list[int] '
'type')
# bond_dim
if isinstance(bond_dim, (list, tuple)):
if boundary == 'obc':
if len(bond_dim) != n_features - 1:
raise ValueError(
'If `bond_dim` is given as a sequence of int, and '
'`boundary` is "obc", its length should be equal '
'to `n_features` - 1')
elif boundary == 'pbc':
if len(bond_dim) != n_features:
raise ValueError(
'If `bond_dim` is given as a sequence of int, and '
'`boundary` is "pbc", its length should be equal '
'to `n_features`')
self._bond_dim = list(bond_dim)
elif isinstance(bond_dim, int):
if boundary == 'obc':
self._bond_dim = [bond_dim] * (n_features - 1)
elif boundary == 'pbc':
self._bond_dim = [bond_dim] * n_features
else:
raise TypeError('`bond_dim` should be int, tuple[int] or list[int]'
' type')
else:
if not isinstance(tensors, (list, tuple)):
raise TypeError('`tensors` should be a tuple[torch.Tensor] or '
'list[torch.Tensor] type')
else:
self._n_features = len(tensors)
self._in_dim = []
self._out_dim = []
self._bond_dim = []
for i, t in enumerate(tensors):
if not isinstance(t, torch.Tensor):
raise TypeError('`tensors` should be a tuple[torch.Tensor]'
' or list[torch.Tensor] type')
if i == 0:
if len(t.shape) not in [2, 3, 4]:
raise ValueError(
'The first and last elements in `tensors` '
'should be both rank-3 or rank-4 tensors. If'
' the first element is also the last one,'
' it should be a rank-2 tensor')
if len(t.shape) == 2:
self._boundary = 'obc'
self._in_dim.append(t.shape[0])
self._out_dim.append(t.shape[1])
elif len(t.shape) == 3:
self._boundary = 'obc'
self._in_dim.append(t.shape[0])
self._bond_dim.append(t.shape[1])
self._out_dim.append(t.shape[2])
else:
self._boundary = 'pbc'
self._in_dim.append(t.shape[1])
self._bond_dim.append(t.shape[2])
self._out_dim.append(t.shape[3])
elif i == (self._n_features - 1):
if len(t.shape) != len(tensors[0].shape):
raise ValueError(
'The first and last elements in `tensors` '
'should have the same rank. Both should be '
'rank-3 or rank-4 tensors. If the first '
'element is also the last one, it should '
'be a rank-2 tensor')
if len(t.shape) == 3:
self._in_dim.append(t.shape[1])
self._out_dim.append(t.shape[2])
else:
if t.shape[2] != tensors[0].shape[0]:
raise ValueError(
'If the first and last elements in `tensors`'
' are rank-4 tensors, the first dimension '
'of the first element should coincide with'
' the third dimension of the last element')
self._in_dim.append(t.shape[1])
self._bond_dim.append(t.shape[2])
self._out_dim.append(t.shape[3])
else:
if len(t.shape) != 4:
raise ValueError(
'The elements of `tensors` should be rank-4 '
'tensors, except the first and lest elements'
' if boundary is "obc"')
self._in_dim.append(t.shape[1])
self._bond_dim.append(t.shape[2])
self._out_dim.append(t.shape[3])
# n_batches
if not isinstance(n_batches, int):
raise TypeError('`n_batches` should be int type')
self._n_batches = n_batches
# Properties
self._left_node = None
self._right_node = None
self._mats_env = []
# Create Tensor Network
self._make_nodes()
self.initialize(tensors=tensors,
init_method=init_method,
device=device,
dtype=dtype,
**kwargs)
# ----------
# Properties
# ----------
@property
def n_features(self) -> int:
"""Returns number of nodes."""
return self._n_features
@property
def in_dim(self) -> List[int]:
"""Returns input dimensions."""
return self._in_dim
@property
def out_dim(self) -> List[int]:
"""Returns output dimensions."""
return self._out_dim
@property
def bond_dim(self) -> List[int]:
"""Returns bond dimensions."""
return self._bond_dim
@property
def boundary(self) -> Text:
"""Returns boundary condition ("obc" or "pbc")."""
return self._boundary
@property
def n_batches(self) -> int:
"""
Returns number of batch edges of the ``data`` nodes. To change this
attribute, first call :meth:`~tensorkrowch.TensorNetwork.unset_data_nodes`
if there are already data nodes in the network.
"""
return self._n_batches
@n_batches.setter
def n_batches(self, n_batches: int) -> None:
if n_batches != self._n_batches:
if self._data_nodes:
raise ValueError(
'`n_batches` cannot be changed if the MPS has data nodes. '
'Use unset_data_nodes first')
elif not isinstance(n_batches, int):
raise TypeError('`n_batches` should be int type')
self._n_batches = n_batches
@property
def left_node(self) -> Optional[AbstractNode]:
"""Returns the ``left_node``."""
return self._left_node
@property
def right_node(self) -> Optional[AbstractNode]:
"""Returns the ``right_node``."""
return self._right_node
@property
def mats_env(self) -> List[AbstractNode]:
"""Returns the list of nodes in ``mats_env``."""
return self._mats_env
@property
def tensors(self) -> List[torch.Tensor]:
"""Returns the list of MPO tensors."""
mpo_tensors = [node.tensor for node in self._mats_env]
if self._boundary == 'obc':
mpo_tensors[0] = torch.einsum('l,liro->iro',
self.left_node.tensor,
mpo_tensors[0])
mpo_tensors[-1] = torch.einsum('liro,r->lio',
mpo_tensors[-1],
self.right_node.tensor)
return mpo_tensors
# -------
# Methods
# -------
def _make_nodes(self) -> None:
"""Creates all the nodes of the MPO."""
if self._leaf_nodes:
raise ValueError('Cannot create MPO nodes if the MPO already has '
'nodes')
aux_bond_dim = self._bond_dim
if self._boundary == 'obc':
if not aux_bond_dim:
aux_bond_dim = [1]
self._left_node = ParamNode(shape=(aux_bond_dim[0],),
axes_names=('right',),
name='left_node',
network=self)
self._right_node = ParamNode(shape=(aux_bond_dim[-1],),
axes_names=('left',),
name='right_node',
network=self)
aux_bond_dim = aux_bond_dim + [aux_bond_dim[-1]] + [aux_bond_dim[0]]
for i in range(self._n_features):
node = ParamNode(shape=(aux_bond_dim[i - 1],
self._in_dim[i],
aux_bond_dim[i],
self._out_dim[i]),
axes_names=('left', 'input', 'right', 'output'),
name=f'mats_env_node_({i})',
network=self)
self._mats_env.append(node)
if i != 0:
self._mats_env[-2]['right'] ^ self._mats_env[-1]['left']
if self._boundary == 'pbc':
if i == 0:
periodic_edge = self._mats_env[-1]['left']
if i == self._n_features - 1:
self._mats_env[-1]['right'] ^ periodic_edge
else:
if i == 0:
self._left_node['right'] ^ self._mats_env[-1]['left']
if i == self._n_features - 1:
self._mats_env[-1]['right'] ^ self._right_node['left']
[docs] def initialize(self,
tensors: Optional[Sequence[torch.Tensor]] = None,
init_method: Optional[Text] = 'randn',
device: Optional[torch.device] = None,
dtype: Optional[torch.dtype] = None,
**kwargs: float) -> None:
"""
Initializes all the nodes of the :class:`MPO`. It can be called when
instantiating the model, or to override the existing nodes' tensors.
There are different methods to initialize the nodes:
* ``{"zeros", "ones", "copy", "rand", "randn"}``: Each node is
initialized calling :meth:`~tensorkrowch.AbstractNode.set_tensor` with
the given method, ``device``, ``dtype`` and ``kwargs``.
Parameters
----------
tensors : list[torch.Tensor] or tuple[torch.Tensor], optional
Sequence of tensors to set in each of the MPO nodes. If ``boundary``
is ``"obc"``, all tensors should be rank-4, except the first and
last ones, which can be rank-3, or rank-2 (if the first and last are
the same). If ``boundary`` is ``"pbc"``, all tensors should be
rank-4.
init_method : {"zeros", "ones", "copy", "rand", "randn"}, optional
Initialization method.
device : torch.device, optional
Device where to initialize the tensors if ``init_method`` is provided.
dtype : torch.dtype, optional
Dtype of the tensor if ``init_method`` is provided.
kwargs : float
Keyword arguments for the different initialization methods. See
:meth:`~tensorkrowch.AbstractNode.make_tensor`.
"""
if tensors is not None:
if len(tensors) != self._n_features:
raise ValueError('`tensors` should be a sequence of `n_features`'
' elements')
if self._boundary == 'obc':
tensors = tensors[:]
if device is None:
device = tensors[0].device
if dtype is None:
dtype = tensors[0].dtype
if len(tensors) == 1:
tensors[0] = tensors[0].reshape(1,
tensors[0].shape[0],
1,
tensors[0].shape[1])
else:
# Left node
aux_tensor = torch.zeros(*self._mats_env[0].shape,
device=device,
dtype=dtype)
aux_tensor[0] = tensors[0]
tensors[0] = aux_tensor
# Right node
aux_tensor = torch.zeros(*self._mats_env[-1].shape,
device=device,
dtype=dtype)
aux_tensor[..., 0, :] = tensors[-1]
tensors[-1] = aux_tensor
for tensor, node in zip(tensors, self._mats_env):
node.tensor = tensor
elif init_method is not None:
for i, node in enumerate(self._mats_env):
node.set_tensor(init_method=init_method,
device=device,
dtype=dtype,
**kwargs)
if self._boundary == 'obc':
aux_tensor = torch.zeros(*node.shape,
device=device,
dtype=dtype)
if i == 0:
# Left node
aux_tensor[0] = node.tensor[0]
elif i == (self._n_features - 1):
# Right node
aux_tensor[..., 0, :] = node.tensor[..., 0, :]
node.tensor = aux_tensor
if self._boundary == 'obc':
self._left_node.set_tensor(init_method='copy',
device=device,
dtype=dtype)
self._right_node.set_tensor(init_method='copy',
device=device,
dtype=dtype)
[docs] def set_data_nodes(self) -> None:
"""
Creates ``data`` nodes and connects each of them to the ``"input"``
edge of each node.
"""
input_edges = [node['input'] for node in self._mats_env]
super().set_data_nodes(input_edges=input_edges,
num_batch_edges=self._n_batches)
[docs] def copy(self, share_tensors: bool = False) -> 'MPO':
"""
Creates a copy of the :class:`MPO`.
Parameters
----------
share_tensor : bool, optional
Boolean indicating whether tensors in the copied MPO should be
set as the tensors in the current MPO (``True``), or cloned
(``False``). In the former case, tensors in both MPO's will be
the same, which might be useful if one needs more than one copy
of an MPO, but wants to compute all the gradients with respect
to the same, unique, tensors.
Returns
-------
MPO
"""
new_mpo = MPO(n_features=self._n_features,
in_dim=self._in_dim,
out_dim=self._out_dim,
bond_dim=self._bond_dim,
boundary=self._boundary,
tensors=None,
n_batches=self._n_batches,
init_method=None,
device=None,
dtype=None)
new_mpo.name = self.name + '_copy'
if share_tensors:
for new_node, node in zip(new_mpo._mats_env, self._mats_env):
new_node.tensor = node.tensor
else:
for new_node, node in zip(new_mpo._mats_env, self._mats_env):
new_node.tensor = node.tensor.clone()
return new_mpo
[docs] def parameterize(self,
set_param: bool = True,
override: bool = False) -> 'TensorNetwork':
"""
Parameterizes all nodes of the MPO. If there are ``resultant`` nodes
in the MPO, it will be first :meth:`~tensorkrowch.TensorNetwork.reset`.
Parameters
----------
set_param : bool
Boolean indicating whether the tensor network has to be parameterized
(``True``) or de-parameterized (``False``).
override : bool
Boolean indicating whether the tensor network should be parameterized
in-place (``True``) or copied and then parameterized (``False``).
"""
if self._resultant_nodes:
warnings.warn(
'Resultant nodes will be removed before parameterizing the TN')
self.reset()
if override:
net = self
else:
net = self.copy(share_tensors=False)
for i in range(self._n_features):
net._mats_env[i] = net._mats_env[i].parameterize(set_param)
if net._boundary == 'obc':
net._left_node = net._left_node.parameterize(set_param)
net._right_node = net._right_node.parameterize(set_param)
return net
[docs] def update_bond_dim(self) -> None:
"""
Updates the :attr:`bond_dim` attribute of the ``MPO``, in case it is
outdated.
If bond dimensions are changed, usually due to decompositions like
:func:`~tensorkrowch.svd`, ``update_bond_dim`` should be
called. This might modify some elements of the model, so it is
recommended to do this before saving the ``state_dict`` of the model.
Besides, if one wants to continue training, the ``parameters`` of the
model that are passed to the optimizer should be updated also.
Otherwise, the optimizer could be tracking outdated parameters that are
not members of the model any more.
"""
if self._boundary == 'obc':
self._bond_dim = [node._shape[2] for node in self._mats_env[:-1]]
if self._bond_dim:
left_size = self._bond_dim[0]
if left_size != self._mats_env[0]._shape[0]:
self._mats_env[0]['left'].change_size(left_size)
right_size = self._bond_dim[-1]
if right_size != self._mats_env[-1]._shape[2]:
self._mats_env[-1]['right'].change_size(right_size)
else:
self._bond_dim = [node._shape[2] for node in self._mats_env]
def _input_contraction(self,
nodes_env: List[AbstractNode],
input_nodes: List[AbstractNode],
inline_input: bool = False) -> Tuple[
Optional[List[Node]],
Optional[List[Node]]]:
"""Contracts input data nodes with MPO nodes."""
if inline_input:
mats_result = [
in_node @ node
for node, in_node in zip(nodes_env, input_nodes)
]
return mats_result
else:
if nodes_env:
stack = op.stack(nodes_env)
stack_data = op.stack(input_nodes)
stack ^ stack_data
result = stack_data @ stack
mats_result = op.unbind(result)
return mats_result
else:
return []
@staticmethod
def _inline_contraction(mats_env: List[AbstractNode],
renormalize: bool = False) -> Node:
"""Contracts sequence of MPO nodes (matrices) inline."""
result_node = mats_env[0]
for node in mats_env[1:]:
result_node @= node
if renormalize:
right_axes = []
for ax_name in result_node.axes_names:
if 'right' in ax_name:
right_axes.append(ax_name)
if right_axes:
result_node = result_node.renormalize(axis=right_axes)
return result_node
def _contract_envs_inline(self,
mats_env: List[AbstractNode],
renormalize: bool = False,
mps: Optional[MPSData] = None) -> Node:
"""Contracts nodes environments inline."""
if (mps is not None) and (mps._boundary == 'obc'):
mats_env[0] = mps._left_node @ mats_env[0]
mats_env[-1] = mats_env[-1] @ mps._right_node
if self._boundary == 'obc':
mats_env = [self._left_node] + mats_env
mats_env = mats_env + [self._right_node]
return self._inline_contraction(mats_env=mats_env,
renormalize=renormalize)
def _aux_pairwise(self,
mats_env: List[AbstractNode],
renormalize: bool = False) -> Tuple[List[Node],
List[Node]]:
"""Contracts a sequence of MPO nodes (matrices) pairwise."""
length = len(mats_env)
aux_nodes = mats_env
if length > 1:
half_length = length // 2
nice_length = 2 * half_length
even_nodes = aux_nodes[0:nice_length:2]
odd_nodes = aux_nodes[1:nice_length:2]
leftover = aux_nodes[nice_length:]
stack1 = op.stack(even_nodes)
stack2 = op.stack(odd_nodes)
stack1 ^ stack2
aux_nodes = stack1 @ stack2
if renormalize:
axes = []
for ax_name in aux_nodes.axes_names:
if ('left' in ax_name) or ('right' in ax_name):
axes.append(ax_name)
if axes:
aux_nodes = aux_nodes.renormalize(axis=axes)
aux_nodes = op.unbind(aux_nodes)
return aux_nodes, leftover
return mats_env, []
def _pairwise_contraction(self,
mats_env: List[Node],
mps: Optional[MPSData] = None,
renormalize: bool = False) -> Node:
"""Contracts nodes environments pairwise."""
length = len(mats_env)
aux_nodes = mats_env
if length > 1:
leftovers = []
while length > 1:
aux1, aux2 = self._aux_pairwise(mats_env=aux_nodes,
renormalize=renormalize)
aux_nodes = aux1
leftovers = aux2 + leftovers
length = len(aux1)
aux_nodes = aux_nodes + leftovers
return self._pairwise_contraction(mats_env=aux_nodes,
renormalize=renormalize,
mps=mps)
return self._contract_envs_inline(mats_env=aux_nodes,
renormalize=renormalize,
mps=mps)
[docs] def contract(self,
inline_input: bool = False,
inline_mats: bool = False,
renormalize: bool = False,
mps: Optional[MPSData] = None) -> Node:
"""
Contracts the whole MPO with input data nodes. The input can be in the
form of an :class:`MPSData`, which may be convenient for tensorizing
vector-matrix multiplication in the form of MPS-MPO contraction.
If the ``MPO`` is contracted with a ``MPSData``, MPS nodes will become
part of the MPO network, and they will be connected to the ``"input"``
edges of the MPO. Thus, the MPS and the MPO should have the same number
of features (``n_features``).
Even though it is not necessary to connect the ``MPSData`` nodes to the
MPO nodes by hand before contraction, it can be done. However, one
should first move the MPS nodes to the MPO network.
Also, when contracting the MPO with and ``MPSData``, if any of the
contraction arguments, ``inline_input`` or ``inline_mats``, is set to
``False``, the MPO (already connected to the MPS) should be
:meth:`~tensorkrowch.TensorNetwork.reset` before contraction if new
data is set into the ``MPSData`` nodes. This is because :class:`MPSData`
admits data tensors with different bond dimensions for each iteration,
and this may cause undesired behaviour when reusing some information of
previous calls to :func:~tensorkrowch.stack` with the previous data
tensors.
To perform the MPS-MPO contraction, first input data tensors have to
be put into the :class:`MPSData` via :meth:`MPSData.add_data`. Then,
contraction is carried out by calling ``mpo(mps=mps_data)``, without
passing the input data again, as it is already stored in the MPSData
nodes.
Parameters
----------
inline_input : bool
Boolean indicating whether input ``data`` nodes should be contracted
with the ``MPO`` nodes inline (one contraction at a time) or in a
single stacked contraction.
inline_mats : bool
Boolean indicating whether the sequence of matrices (resultant
after contracting the input ``data`` nodes) should be contracted
inline or as a sequence of pairwise stacked contrations.
renormalize : bool
Indicates whether nodes should be renormalized after contraction.
If not, it may happen that the norm explodes or vanishes, as it
is being accumulated from all nodes. Renormalization aims to avoid
this undesired behavior by extracting the norm of each node on a
logarithmic scale. The renormalization only occurs when multiplying
sequences of matrices, once the `input` contractions have been
already performed, including contracting against ``MPSData``.
mps : MPSData, optional
MPS that is to be contracted with the MPO. New data can be
put into the MPS via :meth:`MPSData.add_data`, and the MPS-MPO
contraction is performed by calling ``mpo(mps=mps_data)``, without
passing the input data again, as it is already stored in the MPS
cores.
Returns
-------
Node
"""
if mps is not None:
if not isinstance(mps, MPSData):
raise TypeError('`mps` should be MPSData type')
if mps._n_features != self._n_features:
raise ValueError(
'`mps` should have as many features as the MPO')
# Move MPSData ndoes to self
mps._mats_env[0].move_to_network(self)
# Connect mps nodes to mpo nodes
for mps_node, mpo_node in zip(mps._mats_env, self._mats_env):
mps_node['feature'] ^ mpo_node['input']
mats_env = self._input_contraction(
nodes_env=self._mats_env,
input_nodes=[node.neighbours('input') for node in self._mats_env],
inline_input=inline_input)
if inline_mats:
result = self._contract_envs_inline(mats_env=mats_env,
renormalize=renormalize,
mps=mps)
else:
result = self._pairwise_contraction(mats_env=mats_env,
renormalize=renormalize,
mps=mps)
# Contract periodic edge
if result.is_connected_to(result):
result @= result
# Put batch edges in first positions
batch_edges = []
other_edges = []
for i, edge in enumerate(result.edges):
if edge.is_batch():
batch_edges.append(i)
else:
other_edges.append(i)
all_edges = batch_edges + other_edges
if all_edges != list(range(len(all_edges))):
result = op.permute(result, tuple(all_edges))
return result
[docs] @torch.no_grad()
def canonicalize(self,
oc: Optional[int] = None,
mode: Text = 'svd',
rank: Optional[int] = None,
cum_percentage: Optional[float] = None,
cutoff: Optional[float] = None,
renormalize: bool = False) -> None:
r"""
Turns MPO into `canonical` form via local SVD/QR decompositions in the
same way this transformation is applied to :class:`~tensorkrowch.models.MPS`.
To specify the new bond dimensions, the arguments ``rank``,
``cum_percentage`` or ``cutoff`` can be specified. These will be used
equally for all SVD computations.
If none of them are specified, the bond dimensions won't be modified
if possible. Only when the bond dimension is bigger than the physical
dimension multiplied by the other bond dimension of the node, it will
be cropped to that size.
If rank is not specified, the current bond dimensions will be used as
the rank. That is, the current bond dimensions will be the upper bound
for the possibly new bond dimensions given by the arguments
``cum_percentage`` and/or ``cutoff``.
Parameters
----------
oc : int
Position of the orthogonality center. It should be between 0 and
``n_features - 1``.
mode : {"svd", "svdr", "qr"}
Indicates which decomposition should be used to split a node after
contracting it. See more at :func:`~tensorkrowch.svd_`,
:func:`~tensorkrowch.svdr_`, :func:`~tensorkrowch.qr_`.
If mode is "qr", operation :func:`~tensorkrowch.qr_` will be
performed on nodes at the left of the output node, whilst operation
:func:`~tensorkrowch.rq_` will be used for nodes at the right.
rank : int, optional
Number of singular values to keep.
cum_percentage : float, optional
Proportion that should be satisfied between the sum of all singular
values kept and the total sum of all singular values.
.. math::
\frac{\sum_{i \in \{kept\}}{s_i}}{\sum_{i \in \{all\}}{s_i}} \ge
cum\_percentage
cutoff : float, optional
Quantity that lower bounds singular values in order to be kept.
renormalize : bool
Indicates whether nodes should be renormalized after SVD/QR
decompositions. If not, it may happen that the norm explodes as it
is being accumulated from all nodes. Renormalization aims to avoid
this undesired behavior by extracting the norm of each node on a
logarithmic scale after SVD/QR decompositions are computed. Finally,
the normalization factor is evenly distributed among all nodes of
the MPO.
Examples
--------
>>> mpo = tk.models.MPO(n_features=4,
... in_dim=2,
... out_dim=2,
... bond_dim=5)
>>> mpo.canonicalize(rank=3)
>>> mpo.bond_dim
[3, 3, 3]
"""
self.reset()
prev_auto_stack = self._auto_stack
self.auto_stack = False
if oc is None:
oc = self._n_features - 1
elif (oc < 0) or (oc >= self._n_features):
raise ValueError('Orthogonality center position `oc` should be '
'between 0 and `n_features` - 1')
log_norm = 0
nodes = self._mats_env[:]
if self._boundary == 'obc':
nodes[0].tensor[1:] = torch.zeros_like(
nodes[0].tensor[1:])
nodes[-1].tensor[..., 1:, :] = torch.zeros_like(
nodes[-1].tensor[..., 1:, :])
# If mode is svd or svr and none of the args is provided, the ranks are
# kept as they were originally
keep_rank = False
if rank is None:
keep_rank = True
for i in range(oc):
if mode == 'svd':
result1, result2 = nodes[i]['right'].svd_(
side='right',
rank=nodes[i]['right'].size() if keep_rank else rank,
cum_percentage=cum_percentage,
cutoff=cutoff)
elif mode == 'svdr':
result1, result2 = nodes[i]['right'].svdr_(
side='right',
rank=nodes[i]['right'].size() if keep_rank else rank,
cum_percentage=cum_percentage,
cutoff=cutoff)
elif mode == 'qr':
result1, result2 = nodes[i]['right'].qr_()
else:
raise ValueError('`mode` can only be "svd", "svdr" or "qr"')
if renormalize:
aux_norm = result2.norm()
if not aux_norm.isinf() and (aux_norm > 0):
result2.tensor = result2.tensor / aux_norm
log_norm += aux_norm.log()
result1 = result1.parameterize()
nodes[i] = result1
nodes[i + 1] = result2
for i in range(len(nodes) - 1, oc, -1):
if mode == 'svd':
result1, result2 = nodes[i]['left'].svd_(
side='left',
rank=nodes[i]['left'].size() if keep_rank else rank,
cum_percentage=cum_percentage,
cutoff=cutoff)
elif mode == 'svdr':
result1, result2 = nodes[i]['left'].svdr_(
side='left',
rank=nodes[i]['left'].size() if keep_rank else rank,
cum_percentage=cum_percentage,
cutoff=cutoff)
elif mode == 'qr':
result1, result2 = nodes[i]['left'].rq_()
else:
raise ValueError('`mode` can only be "svd", "svdr" or "qr"')
if renormalize:
aux_norm = result1.norm()
if not aux_norm.isinf() and (aux_norm > 0):
result1.tensor = result1.tensor / aux_norm
log_norm += aux_norm.log()
result2 = result2.parameterize()
nodes[i] = result2
nodes[i - 1] = result1
nodes[oc] = nodes[oc].parameterize()
# Rescale
if log_norm != 0:
rescale = (log_norm / len(nodes)).exp()
if renormalize and (log_norm != 0):
for node in nodes:
node.tensor = node.tensor * rescale
# Update variables
self._mats_env = nodes
self.update_bond_dim()
self.auto_stack = prev_auto_stack
[docs]class UMPO(MPO): # MARK: UMPO
"""
Class for Uniform (translationally invariant) Matrix Product Operators. It is
the uniform version of :class:`MPO`, that is, all nodes share the same
tensor. Thus this class cannot have different input/output or bond dimensions
for each node, and boundary conditions are always periodic (``"pbc"``).
|
For a more detailed list of inherited properties and methods,
check :class:`MPO`.
Parameters
----------
n_features : int
Number of nodes that will be in ``mats_env``.
in_dim : int, optional
Input dimension.
out_dim : int, optional
Output dimension.
bond_dim : int, optional
Bond dimension.
tensor: torch.Tensor, optional
Instead of providing ``in_dim``, ``out_dim`` and ``bond_dim``, a single
tensor can be provided. ``n_features`` is still needed to specify how
many times the tensor should be used to form a finite MPO. The tensor
should be rank-4, with its first and third dimensions being equal.
n_batches : int
Number of batch edges of input ``data`` nodes. Usually ``n_batches = 1``
(where the batch edge is used for the data batched) but it could also
be ``n_batches = 2`` (one edge for data batched, other edge for image
patches in convolutional layers).
init_method : {"zeros", "ones", "copy", "rand", "randn"}, optional
Initialization method. Check :meth:`initialize` for a more detailed
explanation of the different initialization methods.
device : torch.device, optional
Device where to initialize the tensors if ``init_method`` is provided.
dtype : torch.dtype, optional
Dtype of the tensor if ``init_method`` is provided.
kwargs : float
Keyword arguments for the different initialization methods. See
:meth:`~tensorkrowch.AbstractNode.make_tensor`.
Examples
--------
>>> mpo = tk.models.UMPO(n_features=4,
... in_dim=2,
... out_dim=2,
... bond_dim=5)
>>> for node in mpo.mats_env:
... assert node.tensor_address() == 'virtual_uniform'
...
>>> data = torch.ones(20, 4, 2) # batch_size x n_features x feature_size
>>> result = mpo(data)
>>> result.shape
torch.Size([20, 2, 2, 2, 2])
"""
def __init__(self,
n_features: int = None,
in_dim: Optional[int] = None,
out_dim: Optional[int] = None,
bond_dim: Optional[int] = None,
tensor: Optional[torch.Tensor] = None,
n_batches: int = 1,
init_method: Text = 'randn',
device: Optional[torch.device] = None,
dtype: Optional[torch.dtype] = None,
**kwargs) -> None:
tensors = None
# n_features
if not isinstance(n_features, int):
raise TypeError('`n_features` should be int type')
elif n_features < 1:
raise ValueError('`n_features` should be at least 1')
if tensor is None:
# in_dim
if not isinstance(in_dim, int):
raise TypeError('`in_dim` should be int type')
# out_dim
if not isinstance(out_dim, int):
raise TypeError('`out_dim` should be int type')
# bond_dim
if not isinstance(bond_dim, int):
raise TypeError('`bond_dim` should be int type')
else:
if not isinstance(tensor, torch.Tensor):
raise TypeError('`tensor` should be torch.Tensor type')
if len(tensor.shape) != 4:
raise ValueError('`tensor` should be a rank-4 tensor')
if tensor.shape[0] != tensor.shape[2]:
raise ValueError('`tensor` first and last dimensions should'
' be equal so that the MPS can have '
'periodic boundary conditions')
tensors = [tensor] * n_features
super().__init__(n_features=n_features,
in_dim=in_dim,
out_dim=out_dim,
bond_dim=bond_dim,
boundary='pbc',
tensors=tensors,
n_batches=n_batches,
init_method=init_method,
device=device,
dtype=dtype,
**kwargs)
self.name = 'umpo'
def _make_nodes(self) -> None:
"""Creates all the nodes of the MPO."""
super()._make_nodes()
# Virtual node
uniform_memory = ParamNode(shape=(self._bond_dim[0],
self._in_dim[0],
self._bond_dim[0],
self._out_dim[0]),
axes_names=('left', 'input', 'right', 'output'),
name='virtual_uniform',
network=self,
virtual=True)
self.uniform_memory = uniform_memory
for node in self._mats_env:
node.set_tensor_from(uniform_memory)
[docs] def initialize(self,
tensors: Optional[Sequence[torch.Tensor]] = None,
init_method: Optional[Text] = 'randn',
device: Optional[torch.device] = None,
dtype: Optional[torch.dtype] = None,
**kwargs: float) -> None:
"""
Initializes the common tensor of the :class:`UMPO`. It can be called
when instantiating the model, or to override the existing nodes' tensors.
There are different methods to initialize the nodes:
* ``{"zeros", "ones", "copy", "rand", "randn"}``: The tensor is
initialized calling :meth:`~tensorkrowch.AbstractNode.set_tensor` with
the given method, ``device``, ``dtype`` and ``kwargs``.
Parameters
----------
tensors : list[torch.Tensor] or tuple[torch.Tensor], optional
Sequence of a single tensor to set in each of the MPO nodes. The
tensor should be rank-4, with its first and third dimensions being
equal.
init_method : {"zeros", "ones", "copy", "rand", "randn"}, optional
Initialization method.
device : torch.device, optional
Device where to initialize the tensors if ``init_method`` is provided.
dtype : torch.dtype, optional
Dtype of the tensor if ``init_method`` is provided.
kwargs : float
Keyword arguments for the different initialization methods. See
:meth:`~tensorkrowch.AbstractNode.make_tensor`.
"""
if tensors is not None:
self.uniform_memory.tensor = tensors[0]
elif init_method is not None:
self.uniform_memory.set_tensor(init_method=init_method,
device=device,
dtype=dtype,
**kwargs)
[docs] def copy(self, share_tensors: bool = False) -> 'UMPO':
"""
Creates a copy of the :class:`UMPO`.
Parameters
----------
share_tensor : bool, optional
Boolean indicating whether the common tensor in the copied UMPO
should be set as the tensor in the current UMPO (``True``), or
cloned (``False``). In the former case, the tensor in both UMPO's
will be the same, which might be useful if one needs more than one
copy of a UMPO, but wants to compute all the gradients with respect
to the same, unique, tensor.
Returns
-------
UMPO
"""
new_mpo = UMPO(n_features=self._n_features,
in_dim=self._in_dim[0],
out_dim=self._out_dim[0],
bond_dim=self._bond_dim[0],
tensor=None,
n_batches=self._n_batches,
init_method=None,
device=None,
dtype=None)
new_mpo.name = self.name + '_copy'
if share_tensors:
new_mpo.uniform_memory.tensor = self.uniform_memory.tensor
else:
new_mpo.uniform_memory.tensor = self.uniform_memory.tensor.clone()
return new_mpo
[docs] def parameterize(self,
set_param: bool = True,
override: bool = False) -> 'TensorNetwork':
"""
Parameterizes all nodes of the MPO. If there are ``resultant`` nodes
in the MPO, it will be first :meth:`~tensorkrowch.TensorNetwork.reset`.
Parameters
----------
set_param : bool
Boolean indicating whether the tensor network has to be parameterized
(``True``) or de-parameterized (``False``).
override : bool
Boolean indicating whether the tensor network should be parameterized
in-place (``True``) or copied and then parameterized (``False``).
"""
if self._resultant_nodes:
warnings.warn(
'Resultant nodes will be removed before parameterizing the TN')
self.reset()
if override:
net = self
else:
net = self.copy(share_tensors=False)
for i in range(self._n_features):
net._mats_env[i] = net._mats_env[i].parameterize(set_param)
# It is important that uniform_memory is parameterized after the rest
# of the nodes
net.uniform_memory = net.uniform_memory.parameterize(set_param)
# Tensor addresses have to be reassigned to reference
# the uniform memory
for node in net._mats_env:
node.set_tensor_from(net.uniform_memory)
return net
def canonicalize(self,
oc: Optional[int] = None,
mode: Text = 'svd',
rank: Optional[int] = None,
cum_percentage: Optional[float] = None,
cutoff: Optional[float] = None,
renormalize: bool = False) -> None:
""":meta private:"""
raise NotImplementedError(
'`canonicalize` not implemented for UMPO')