# mu-mimo/mu_mimo/processing/precoding.py
from __future__ import annotations
from abc import ABC, abstractmethod
import numpy as np
import scipy as sp
from ..types import ComplexArray, RealArray, IntArray, BitArray, ChannelStateInformation
[docs]
class Precoder(ABC):
"""
The Precoder Abstract Base Class (ABC).
A precoder class is responsible for implementing a precoding strategy and effectively precoding the transmitted signals in the base station.\\
In case of coordinated beamforming, the precoder is responsible for computing the combining matrices for each UT as well! These are then later sent to the UTs.
In addition, the precoder is responsible for computing the equalization coefficients for each data stream. These will be used by the UTs to correctly rescale the received symbols before the decoding process.
"""
[docs]
@staticmethod
@abstractmethod
def compute(csi: ChannelStateInformation, Pt: float, K: int,) -> tuple[ComplexArray, ComplexArray | None, ComplexArray]:
"""
Compute the compound precoding matrix.\\
In case of coordinated beamforming, the combining matrices for each UT are computed as well.
In addition, the equalization coefficients for each data stream are computed as well. These will be used by the UTs to correctly rescale the received symbols before the decoding process.
Parameters
----------
csi : ChannelStateInformation
The channel state information (CSI) of the system.
Pt : float
The total transmit power available at the BS.
K : int
The number of user terminals (UTs) in the system.
Returns
-------
F : ComplexArray, shape (Nt, K*Nr)
The compound precoding matrix for all UTs. It contains the power allocation across the data streams as well.
G : ComplexArray, shape (K*Nr, K*Nr) or None
The compound combining matrix (block diagonal) for all UTs in case of coordinated beamforming. None otherwise.
C_eq : ComplexArray, shape (K*Nr,)
The equalization coefficients for all UTs.
"""
raise NotImplementedError
[docs]
@staticmethod
def apply(a: ComplexArray, F: ComplexArray, ibr: IntArray) -> ComplexArray:
"""
Apply the precoding matrix to the data symbols.
Parameters
----------
a : ComplexArray, shape (Ns_total, M)
The data symbol streams for all UTs.
F : ComplexArray, shape (Nt, K*Nr)
The compound precoding matrix for all UTs.
ibr : IntArray, shape (K*Nr,)
The number of bits per symbol for each data stream (active and inactve).
Returns
-------
x : ComplexArray, shape (Nt, M)
The precoded signal to be transmitted by the BS.
"""
x = F[:, ibr > 0] @ a
return x
[docs]
class NeutralPrecoder(Precoder):
"""
Neutral Precoder.
This precoder acts as a 'neutral element' for precoding.\\
It does not perform any precoding and simply passes the data symbols through without any modification.
In addition, the power allocation is uniform across the data streams (so no real power allocation is performed either).
Finally, in case of coordinated beamforming, the combining matrices for each UT are set to the identity matrix (so no real combining is performed either).
"""
[docs]
@staticmethod
def compute(csi: ChannelStateInformation, Pt: float, K: int) -> tuple[ComplexArray, ComplexArray | None, ComplexArray]:
Nt = csi.H_eff.shape[1]
Nr = csi.H_eff.shape[0] // K
F = np.eye(Nt, K*Nr)
G = np.eye(K*Nr)
C_eq = np.ones(K*Nr)
return F, G, C_eq
[docs]
class SVDPrecoder(Precoder):
"""
Singular Value Decomposition (SVD) Precoder.
The optimal precoding strategy for a single-user MIMO system.\\
This precoding strategy is only available for single-user systems, since it requires combining all data streams.
"""
[docs]
@staticmethod
def compute(csi: ChannelStateInformation, Pt: float, K: int) -> tuple[ComplexArray, ComplexArray | None, ComplexArray]:
# Validate that the SVD precoding strategy is only applied in single-user systems.
if K > 1:
raise ValueError("The SVD precoding strategy is only available for single-user systems. Please choose a different precoding strategy for multi-user systems.")
# Execute the SVD of the effective channel matrix.
U, Sigma, Vh = np.linalg.svd(csi.H_eff, full_matrices=False)
# Determine the precoder and combiner matrices.
F = Vh.conj().T
G = U.conj().T
# Determine the optimal power allocation across all the data streams.
gamma = (Sigma**2) * (csi.snr / Pt)
P = waterfilling_v1(gamma=gamma, pt=Pt)
FP = F @ np.diag(np.sqrt(P))
# Compute the equalization coefficients.
C_eq = np.diag( G @ csi.H_eff @ FP )
return FP, G, C_eq
[docs]
class ZFPrecoder(Precoder):
"""
Zero-Forcing (ZF) Precoder.
The precoder aims to completely eliminate all interference at the user terminals.\\
The precoding matrix is therefore computed as the pseudo-inverse of the effective channel matrix H_eff.
In addition, the power allocation across the data streams is optimal in the sense that it maximizes the sum rate of the system under the total power constraint Pt.\\
"""
[docs]
@staticmethod
def compute(csi: ChannelStateInformation, Pt: float, K: int) -> tuple[ComplexArray, ComplexArray | None, ComplexArray]:
# The combining matrix is not computed in the ZF precoding strategy.
G = None
# The precoding matrix is computed as the pseudo-inverse of the effective channel matrix.
F = np.linalg.pinv(csi.H_eff)
F_norm = F / np.linalg.norm(F, axis=0)
# The power allocation across the data streams is computed using the waterfilling algorithm to maximize the sum rate under the total power constraint Pt.
gamma = (1 / np.linalg.norm(F, axis=0)) * (csi.snr / Pt)
P = waterfilling_v1(gamma=gamma, pt=Pt)
FP = F_norm @ np.diag(np.sqrt(P))
# Compute the equalization coefficients.
C_eq = np.diag( csi.H_eff @ FP )
return FP, G, C_eq
[docs]
class BDPrecoder(Precoder):
"""
Block Diagonalization (BD) Precoder.
Firstly, the multi-user precoder aims to completely eliminate all inter-user interference at the user terminals. This is achieved by choosing the multi-user precoding matrix of each UT such that it spans the null space of the interfering channel matrix of that UT.\\
Secondly, the single-user precoder for each UT is computed as the SVD precoder of the effective channel matrix of that UT after applying the multi-user precoding matrix. Analogue to SVD precoding in a SU-MIMO system. Same for the single-user combiner.
"""
[docs]
@staticmethod
def compute(csi: ChannelStateInformation, Pt: float, K: int) -> tuple[ComplexArray, ComplexArray | None, ComplexArray]:
# Parameter initialization.
H = csi.H_eff
Nr = H.shape[0] // K
Nt = H.shape[1]
# STEP 1: the multi-user precoding matrix.
r_ring = Nt - (K-1)*Nr
F1 = np.empty((Nt, K*r_ring), dtype=complex)
for k in range(K):
H_ring_k = np.delete(H, slice(k*Nr, (k+1)*Nr), axis=0)
_, _, Vh_ring_k = sp.linalg.svd(H_ring_k)
V_ring_k = Vh_ring_k.conj().T
F1_k = V_ring_k[:, Nt-r_ring : Nt]
F1[:, k*r_ring : (k+1)*r_ring] = F1_k
# STEP 2: the single-user precoding matrix.
F2 = np.zeros((K*r_ring, K*Nr), dtype=complex)
G = np.zeros((K*Nr, K*Nr), dtype=complex)
for k in range(K):
H_k = H[k*Nr : (k+1)*Nr, :]
F1_k = F1[:, k*r_ring : (k+1)*r_ring]
U_k, Sigma_k, Vh_k = np.linalg.svd(H_k @ F1_k, full_matrices=False)
F2_k = Vh_k.conj().T[:, :Nr]
G_k = U_k.conj().T
F2[k*r_ring : (k+1)*r_ring, k*Nr : (k+1)*Nr] = F2_k
G[k*Nr : (k+1)*Nr, k*Nr : (k+1)*Nr] = G_k
# STEP 3: the power allocation.
F = F1 @ F2
T = G @ H @ F
gamma = np.abs(np.diagonal(T))**2 * (csi.snr / Pt)
P = waterfilling_v1(gamma=gamma, pt=Pt)
FP = F @ np.diag(np.sqrt(P))
# Compute the equalization coefficients.
C_eq = np.diag( G @ H @ FP )
return FP, G, C_eq
[docs]
class WMMSEPrecoder(Precoder):
pass
def waterfilling_v1(gamma: RealArray, pt: float) -> RealArray:
r"""
Waterfilling algorithm.
This function implements the waterfilling algorithm to find the optimal power allocation across N transmission streams, given the channel-to-noise ratio (CNR) coefficients `gamma` and the total available transmit power `pt`.
In particular, it solves the following constraint optimization problem:
.. math::
\begin{aligned}
& \underset{\{p_n\}}{\text{max}}
& & \sum_{n=1}^{N} \log_2 \left( 1 + \gamma_n \, p_n \right) \\
& \text{s. t.}
& & \sum_{n=1}^{N} p_n = p_t \\
& & & \forall n \in \{1, \ldots, N\} : \, p_n \geq 0
\end{aligned}
Parameters
----------
gamma : RealArray, shape (N,)
Channel-to-Noise Ratio (CNR) coefficients for each eigenchannel.
pt : float
Total available transmit power.
Returns
-------
p : RealArray, shape (N,)
Optimal power allocation across the eigenchannels.
"""
# STEP 0: Sort the CNR coefficients in descending order.
sorted_indices = np.argsort(gamma)[::-1]
gamma = gamma[sorted_indices]
# STEP 1: Determine the number of active streams.
pt_iter = lambda as_iter: np.sum( (1 / gamma[as_iter]) - (1 / gamma[:as_iter]) )
as_UB = len(gamma)
as_LB = 0
while as_UB - as_LB > 1:
as_iter = (as_UB + as_LB) // 2
if pt > pt_iter(as_iter): as_LB = as_iter
elif pt <= pt_iter(as_iter): as_UB = as_iter
# STEP 2: Compute the optimal power allocation for each active stream.
p_step1 = ( (1 / gamma[as_LB]) - (1 / gamma[:as_LB]) )
p_step1 = np.concatenate( (p_step1, np.zeros(as_UB - as_LB)) )
power_remaining = pt - np.sum(p_step1)
p_step2 = (1 / as_UB) * power_remaining
p_sorted = np.concatenate( (p_step1 + p_step2, np.zeros(len(gamma) - as_UB)) )
# STEP 3: Reorder the power allocation to match the original order of the streams.
p = np.empty_like(p_sorted)
p[sorted_indices] = p_sorted
return p