This notebook illustrates how to perform a simulation of a transmission through an AWGN channel using QPSK modulation. It also shows how to use the features in `pyphysim`

to implement "simulators" and how they can have with some boiler plate code and add functionality such as progressbar, saving results to a file and running simulations in parallel.

```
In [1]:
```%matplotlib inline
import math
import numpy as np
from matplotlib import pyplot as plt
from pyphysim.modulators.fundamental import BPSK, QAM, QPSK, Modulator
from pyphysim.simulations import Result, SimulationResults, SimulationRunner
from pyphysim.util.conversion import dB2Linear
from pyphysim.util.misc import pretty_time, randn_c
np.set_printoptions(precision=2, linewidth=120)

```
In [2]:
```qpsk = QPSK()
bpsk = BPSK()
qam16 = QAM(16)
qam64 = QAM(64)
fig, [[ax11, ax12], [ax21, ax22]] = plt.subplots(figsize=(10, 10),
nrows=2,
ncols=2)
ax11.set_title("BPSK")
ax11.plot(bpsk.symbols.real, bpsk.symbols.imag, "*r", label="BPSK")
ax11.axis("equal")
ax12.set_title("QPSK")
ax12.plot(qpsk.symbols.real, qpsk.symbols.imag, "*r", label="QPSK")
ax12.axis("equal")
ax21.set_title("16-QAM")
ax21.plot(qam16.symbols.real, qam16.symbols.imag, "*r", label="16-QAM")
ax21.axis("equal")
ax22.set_title("64-QAM")
ax22.plot(qam64.symbols.real, qam64.symbols.imag, "*r", label="64-QAM")
ax22.axis("equal");

```
```

```
In [3]:
```qpsk = QPSK()
num_symbols = int(1e3)
# We need the data to be in the interval [0, M), where M is the
# number of symbols in the constellation
data_qpsk = np.random.randint(0, qpsk.M, size=num_symbols)
modulated_data_qpsk = qpsk.modulate(data_qpsk)

```
In [4]:
```SNR_dB = 20
snr_linear = dB2Linear(SNR_dB)
noise_power = 1 / snr_linear
# Noise vector
n = math.sqrt(noise_power) * randn_c(num_symbols)

```
In [5]:
```# received_data_qam = su_channel.corrupt_data(modulated_data_qam)
received_data_qpsk = modulated_data_qpsk + n

```
In [6]:
```# Received data
fig, ax = plt.subplots(figsize=(5, 5))
ax.plot(received_data_qpsk.real, received_data_qpsk.imag, "*")
ax.axis("equal")
# fig.show()

```
Out[6]:
```

```
In [7]:
```demodulated_data_qpsk = qpsk.demodulate(received_data_qpsk)
symbol_error_rate_qpsk = 1 - sum(
demodulated_data_qpsk == data_qpsk) / demodulated_data_qpsk.size
print("Error QPSK:", symbol_error_rate_qpsk)

```
```

Delete previous variables

```
In [8]:
```del SNR_dB, ax, ax11, ax12, ax21, ax22, bpsk, data_qpsk, demodulated_data_qpsk, fig, modulated_data_qpsk, n, noise_power, num_symbols, qam16, qam64, qpsk, received_data_qpsk, snr_linear, symbol_error_rate_qpsk

```
In [9]:
```def simulate_awgn(modulator: Modulator, num_symbols: int, noise_power: float,
num_reps: int):
"""Return the symbol error rate"""
symbol_error_rate = 0.0
for rep in range(num_reps):
data = np.random.randint(0, modulator.M, size=num_symbols)
modulated_data = modulator.modulate(data)
# Noise vector
n = math.sqrt(noise_power) * randn_c(num_symbols)
# received_data_qam = su_channel.corrupt_data(modulated_data_qam)
received_data = modulated_data + n
demodulated_data = modulator.demodulate(received_data)
symbol_error_rate += 1 - sum(
demodulated_data == data) / demodulated_data.size
return symbol_error_rate / num_reps

```
In [10]:
```qpsk = QPSK()
num_symbols = int(1e3)
SNR_dB = 5
snr_linear = dB2Linear(SNR_dB)
noise_power = 1 / snr_linear
# We run twice just to check that the number of simulated symbols and repetitions is enouth to get a proper value
symbol_error1 = simulate_awgn(qpsk, num_symbols, noise_power, num_reps=5000)
symbol_error2 = simulate_awgn(qpsk, num_symbols, noise_power, num_reps=5000)

```
In [11]:
```print(f"Obtained symbol error for SNR {SNR_dB}: {symbol_error1}")
print(f"Obtained symbol error for SNR {SNR_dB}: {symbol_error2}")
# Let's print the theoretical value
print(
f"\nTheoretical symbol error for SNR {SNR_dB}: {qpsk.calcTheoreticalSER(SNR_dB)}"
)

```
```

The previously implemented `simulate_awgn`

function is very simple and we only need to call it with different values of noise power to get the values for a "SNR x Symbol Error Rate" plot.

However, this is a good oportunity to see what the classes in the `pyphysim.simulations`

module can offer.

To create a "simulator" we need to create a subclass of `pyphysim.simulations.SimulationRunner`

and implement the `_run_simulation`

method. This method should perform a single iteration (similar to the previous `simulate_awgn`

function, but without the loop) and return a `SimulationResults`

object containing whatever results you want to save from that iteration. These results will be automatically combined for the many calls of `_run_simulation`

method that will be performed when the `simulate`

of the `SimulationRunner`

base class is called.

In the `__init__`

of your simulatr class you must at least do two things, add the desired "simulation parameters" and set a value for `self.rep_max`

(default is 1) with the number of times you want to call `_run_simulation`

.

```
In [12]:
```class AwgnSimulator(SimulationRunner):
def __init__(self, SINR_dB_values):
super().__init__()
# Add the simulation parameters to the `params` attribute.
self.params.add('SNR_db', SINR_dB_values)
# Optionally set some parameter(s) to be unpacked
self.params.set_unpack_parameter('SNR_db')
# Number of times the `_run_simulation` method will run when `simulate` method is called
self.rep_max = 500
# We can save anything that does not change in `_run_simulation` as attributes
self.modulator = QPSK()
def _run_simulation(self, current_parameters):
# Since SNR_db is an "unpacked parameter" a single value is passed to `_run_simulation`.
# We can get the current value as below
sinr_dB = current_parameters['SNR_db']
# Number of symbols generated for this realization
num_symbols = 1000
# Find the noise power from the SNR value (in dB)
snr_linear = dB2Linear(sinr_dB)
noise_power = 1 / snr_linear
# Generate random transmit data and modulate it
data = np.random.randint(0, self.modulator.M, size=num_symbols)
modulated_data = self.modulator.modulate(data)
# Noise vector
n = math.sqrt(noise_power) * randn_c(num_symbols)
# Receive the corrupted data
received_data = modulated_data + n
# Demodulate the received data and compute the number of symbol errors
demodulated_data = self.modulator.demodulate(received_data)
symbol_errors = sum(demodulated_data != data)
# Create a SimulationResults object and save the symbol error rate.
# Note that the symbol error rate is given by the number of symbol errors divided by the number of
# transmited symbols. We want to combine the symbol error rate for the many calls of `_run_simulation`.
# Thus, we choose `Result.RATIOTYPE` as the "update_type". See the documentation of the `Result` class
# for more about it.
simResults = SimulationResults()
simResults.add_new_result(
"symbol_error_rate",
Result.RATIOTYPE,
value=symbol_errors,
total=num_symbols) # Add one each result you want
return simResults

Now we can run the simulation. Let's create an AwgnSimulator object and check its parameters.

```
In [13]:
```SNR_db = np.linspace(-5, 15, 9)
runner = AwgnSimulator(SNR_db)
# We can see the simulation parameters using the `params` attribute of the `SimulationRunner`
print(runner.params)

```
```

We have only added the `SNR_db`

parameter. The `*`

in the name of the `SNR_db`

parameter indicates that this parameter will be *unpacked*. This means that the `_run_simulation`

method will receive un element of it at a time instead of the whole array.

Now we can finally call the `simulate`

method to perform the simulation. Notice the nice progressbar and that we have 9 variations. Each variation corresponds to a different SNR value. If we had added other parameters that should also be unpacked then we would get a different variation for each combination of unpacked parameters.

```
In [14]:
```runner.simulate()

```
```

`results`

atribute of our simulator object. We can call the `get_result_values_list`

method to get the actual values. Let's first check what is stored there.

```
In [15]:
```runner.results

```
Out[15]:
```

`_run_simulation`

method was called. Other stored information are the "elapsed_time" and "num_skipped_reps" (more on this one later).

```
In [16]:
```print("Symbol Errors:\n",
np.array(runner.results.get_result_values_list("symbol_error_rate")))
elapsed_times = runner.results.get_result_values_list("elapsed_time")
print("\nElapsed times:\n", np.array(elapsed_times))
# pretty_time receives a float number corresponding to an elapsed time in seconds and returns a nice string
print(f"\nTotal elapsed time: {pretty_time(sum(elapsed_times))}")

```
```

Now we can finally see the plot and compare the simulated results with theoretical values.

```
In [17]:
```# Now let's plot the results
fig, ax = plt.subplots(figsize=(5, 5))
ax.semilogy(SNR_db, qpsk.calcTheoreticalSER(SNR_db), "--", label="Theoretical")
ax.semilogy(SNR_db,
runner.results.get_result_values_list("symbol_error_rate"),
label="Simulated")
ax.set_title("QPSK Symbol Error Rate (AWGN channel)")
ax.set_ylabel("Symbol Error Rate")
ax.set_xlabel("SNR (dB)")
ax.legend();

```
```

The simulated values match the theoretical values up to some point, where we can see that we need to simulate more symbols to get the proper symbol error rate. The easy way is to increase the `rep_max`

attribute in our simulator, but this will simulate more symbols also for lower SNR values, which will unecesarelly make the simulation slower. Ideally, we want to simulate a large number of symbols for high SNR values and a low number of symbols for low SNR values.

One way to do this is to set `rep_max`

to the highest value we want (for the high SNR values) and implement the `_keep_going`

method. This method is called each iteration to indicate if we need more iterations of if we can stop early (for a particular variation of parameters). Since we didn't implement it before, the default implementation was used, which just returns True.

The `_keep_going`

should be simple to avoid creating unecessary overhead (is cost must be much lower than `_run_simulation`

). One possible implementation if to return True as long as the number of cumulative symbol errors is below some value. That is, for low SNR values this value will be reached before the number of times `_run_simulation`

is called reach `rep_max`

, while for high SNR values this maximum number of sumbol errors might never be reached and `_run_simulation`

is called `rep_max`

times.

The changed implementation of our simulator is given below.

**Note**: We have set `self.progressbar_message`

in the `__init__`

method. This allow us to change the message of the progressbar and we can use parameters names in the message, which is automatically replaced by the current value (if unpacked).

```
In [18]:
```class AwgnSimulator2(SimulationRunner):
def __init__(self, SINR_dB_values):
super().__init__()
# Add the simulation parameters to the `params` attribute.
self.params.add('SNR_db', SINR_dB_values)
# Optionally set some parameter(s) to be unpacked
self.params.set_unpack_parameter('SNR_db')
# Number of times the `_run_simulation` method will run when `simulate` method is called.
# We are using a value 100 times larger than before, but the simulation will not take
# 100 times the previous elapsed time to finish thanks to the implementation of the
# `_keep_going` method that will allow us to skip many of these iterations for low SNR values
self.rep_max = 50000
# Number of symbols generated for this realization
self.num_symbols = 1000
# Used in the implementation of `_keep_going` method. This is the maximum numbers of symbol
# errors we allow before `_run_simulation` is stoped for a given configuration
self.max_symbol_errors = 1. / 1000. * self.num_symbols * self.rep_max
# We can save anything that does not change in `_run_simulation` as attributes
self.modulator = QPSK()
# Set a nice message for the progressbar
self.progressbar_message = "Simulating for SNR {SNR_db}"
# Change the progressbar "style" to something better for the notebook
# Possible values are 'text1', 'text2' and 'ipython'
# self.update_progress_function_style = "ipython"
def _keep_going(self, current_params, current_sim_results, current_rep):
# Note that we have added a "symbol_errors" result in `_run_simulation` to use here
# Get the last value in the "symbol_errors" results list, which corresponds to the current configuration
cumulated_symbol_errors \
= current_sim_results['symbol_errors'][-1].get_result()
return cumulated_symbol_errors < self.max_symbol_errors
def _run_simulation(self, current_parameters):
# Since SNR_db is an "unpacked parameter" a single value is passed to `_run_simulation`.
# We can get the current value as below
sinr_dB = current_parameters['SNR_db']
# Find the noise power from the SNR value (in dB)
snr_linear = dB2Linear(sinr_dB)
noise_power = 1 / snr_linear
# Generate random transmit data and modulate it
data = np.random.randint(0, self.modulator.M, size=self.num_symbols)
modulated_data = self.modulator.modulate(data)
# Noise vector
n = math.sqrt(noise_power) * randn_c(self.num_symbols)
# Receive the corrupted data
received_data = modulated_data + n
# Demodulate the received data and compute the number of symbol errors
demodulated_data = self.modulator.demodulate(received_data)
symbol_errors = sum(demodulated_data != data)
# Create a SimulationResults object and save the symbol error rate.
# Note that the symbol error rate is given by the number of symbol errors divided by the number of
# transmited symbols. We want to combine the symbol error rate for the many calls of `_run_simulation`.
# Thus, we choose `Result.RATIOTYPE` as the "update_type". See the documentation of the `Result` class
# for more about it.
simResults = SimulationResults()
simResults.add_new_result("symbol_error_rate",
Result.RATIOTYPE,
value=symbol_errors,
total=self.num_symbols)
simResults.add_new_result("symbol_errors",
Result.SUMTYPE,
value=symbol_errors)
return simResults

Now we can run the simulation. We will also call the `set_results_filename`

so that the results are saved to the disk. The results will be saved using pickle. Furthermore, partial results are also saved (inside a "partial_results" folder) and you can interrupt the simulation and continue from where it was interrupted.

If you do not set the results filename before starting the simulation, you can also save the results by calling the `save_to_file`

method of the `runner2.results`

object (although in this case partial results will not be saved during the simulation).

```
In [19]:
```runner2 = AwgnSimulator2(SNR_db)
# Set the name name of the results file
# If the file extension is not provided, then pickle will be used to save the results
# If an extension is provided, it can be either 'pickle' or 'json'
runner2.set_results_filename("results_qpsk_awgn")
runner2.simulate()

```
```

Calling again will simply load the results from the file and it should be very fast.

```
In [20]:
```runner2.simulate()

```
```

```
In [21]:
```# However, if you increase the value of rep_max and run the `simulate` method again
# it will run only the remaining iterations
runner2.rep_max += 2000
runner2.simulate()

```
```

As before we have the "symbol_error_rate", "elapsed_time" and "num_skipped_reps" results.

```
In [22]:
```runner2.results

```
Out[22]:
```

```
In [23]:
```print(runner2.results.get_result_values_list("num_skipped_reps"))

```
```

`SkipThisOne`

exception being raised in the `_run_simulation`

method for some reason. It is not related to the simulation being stopped ealier due to `_keep_going`

returning `False`

. If you need to know how many iterations were run just add a result in `_run_simulation`

to track that.

```
In [24]:
```print("Symbol Errors:\n",
np.array(runner2.results.get_result_values_list("symbol_error_rate")))
elapsed_times2 = runner2.results.get_result_values_list("elapsed_time")
print("\nElapsed times:\n", np.array(elapsed_times2))
# pretty_time receives a float number corresponding to an elapsed time in seconds and returns a nice string
print(f"\nTotal elapsed time: {pretty_time(sum(elapsed_times2))}")

```
```

Notice the elapsed times for the low SNR values. The simulation was very fast for low SNR values, while for high SNR values it took longer. You can compare this to the case where the `_keep_going`

method was not implemented and the elapsed time was approximatelly the same for all SNR values.

Let's plot the results with the new simulation.

```
In [25]:
``````
SNR_db
```

```
Out[25]:
```

```
In [26]:
```# Now let's plot the results
fig, ax = plt.subplots(figsize=(5, 5))
ax.semilogy(SNR_db, qpsk.calcTheoreticalSER(SNR_db), "--", label="Theoretical")
ax.semilogy(SNR_db,
runner2.results.get_result_values_list("symbol_error_rate"),
label="Simulated")
ax.set_title("QPSK Symbol Error Rate (AWGN channel)")
ax.set_ylabel("Symbol Error Rate")
ax.set_xlabel("SNR (dB)")
ax.legend();

```
```

Now the simulated symbol error rate match the theoretical value also in the high SNR regime.

By implementing a subclass of `SimulationRunner`

you also gain the possibility of running the different configurations (different SNR values in our case) in parallel by using the `simulate_in_parallel`

method, instead of `simulate`

. This parallel computation is performed using the `ipyparallel`

library, which is an optional dependency of `pyphysim`

. After installing `ipyparallel`

you need to start a "cluster" using the `ipcluster start`

command in a shell. After than, import `Client`

from the `ipyparallel`

library and call the `load_balanced_view`

method on a `Client`

object to get a "view", which you need to pass to the `simulate_in_parallel`

. The following code illustrates the process.

```
from ipyparallel import LoadBalancedView, Client
# Before you run this line you must run `ipcluster start` in a terminal to start the cluster
c = Client()
# Get a "view" from the client
# You must have stared the ipcluster before this
view = c.load_balanced_view()
```

**Caveat**: Currently multiple configurations are simulated in parallel, but the not multiple repetitions of a single configuration are not run in parallel.

Now we nust need to call

```
runner2.simulate_in_parallel(view)
```

However, this will not work because the `AwgnSimulator2`

class was defined here in this notebook. For using the parallel infrastructure of the `ipyparallel`

library the simulation runner class needs to be pickleable, which is not possible when it is defined interactivelly in the python terminal. Therefore, move the code to a file and import the class. That should allow calling the `simulate_in_parallel`

method.

**Note**: When running in parallel only a single progressbar will be presented, but it will account the progress of all running configurations.

The `results`

atribute of the simulator (the `runner2`

object in this notebook) is an instance of the `SimulationResults`

class. There are a few ways to get the values stored in it. To see which results are stored in the `results`

object we can call the `get_result_names`

method.

```
In [27]:
```runner2.results.get_result_names()

```
Out[27]:
```

We can index this object with the name of the desired result.

```
In [28]:
```runner2.results["elapsed_time"]

```
Out[28]:
```

`Result`

objects (see the help of the `Result class for more`

), but if you just want the values then call the `get_result_values_list`

method of the `SimulationResults`

class instead.

```
In [29]:
```# Get the elapsed time of each configuration (each SNR in our case)
runner2.results.get_result_values_list("elapsed_time")

```
Out[29]:
```

`get_result_values_confidence_intervals`

method. You can even use the confidence interval in the implementation of the `_keep_going`

method, if you want.

```
In [30]:
```runner2.results.get_result_values_confidence_intervals("symbol_error_rate")

```
Out[30]:
```

To save the results to a file you can use `save_to_file`

method of `SimulationResults`

. That was already done for us in the end of the simulation, since we have called the `set_results_filename`

method of our simulator.

Once you have a file with the saved results you can load its data using the `SimulationResults.load_from_file`

. This returns a `SimulationResults`

object with the loaded data and you can use the previously discussed ways to access its data.

```
In [31]:
```# If the filename is provided without extension, then 'pickle' is assumed as the file extension
results = SimulationResults.load_from_file("results_qpsk_awgn")
results

```
Out[31]:
```

`SimulationResults`

object is calling its `to_dataframe`

method. It will create a (https://pandas.pydata.org/)[pandas] `DataFrame`

with the data in the `SimulationResults`

object. This required the `pandas`

library to be installed.

```
In [32]:
```results.to_dataframe()

```
Out[32]:
```

Notice that besides the simulation results, the dataframe also includes the simulation parameters (`SNR_db`

in our case) as well as the number of runned repetitions, skipped repetitions, and the elapsed time.

Indeed, the `SimulationResults`

object stores all of this information. Particularly, the `params`

attribute has all the simulation parameters. This allow you to have all necessary information of a simulation from the saved results file.

```
In [33]:
```results.params

```
Out[33]:
```

This notebook illustrated how to create simulators in `pyphysim`

, but it only touched a very small part of functionality to simulate physical layer transmission in wireless communications. You can explore the classes in `pyphysim`

for other functionality such as other channels, MIMO transmission, etc.. In general you can addapt code here to simulate other scenarios by just changing the implementation of the `_run_simulation`

and `_keep_going`

methods.