Skip to content

Build a Transmit Signal

gr-apollo provides two approaches to constructing Apollo USB downlink signals:

  • Batch (pure Python): generate_usb_baseband() produces a complete numpy array in one call. No GNU Radio required. Covered in the Generate Test Signals guide.
  • Streaming (GNU Radio blocks): Six composable TX blocks that mirror the RX chain, producing a continuous complex baseband stream through the GNU Radio scheduler.

The TX blocks map 1:1 to their RX counterparts — each modulation stage has a direct demodulation match. If you understand the receive chain, you already understand the transmit chain in reverse.

TX BlockRX BlockFunction
pcm_frame_sourcepcm_frame_syncFrame generation / frame sync
nrz_encoder(slicer in bpsk_demod)NRZ encoding / NRZ slicing
bpsk_subcarrier_modbpsk_subcarrier_demodBPSK modulation / demodulation
fm_voice_subcarrier_modvoice_subcarrier_demodVoice FM modulation / demodulation
pm_modpm_demodPM modulation / demodulation
sco_modsco_demodSCO modulation / demodulation
usb_signal_sourceusb_downlink_receiverFull chain TX / full chain RX
graph LR
    A["pcm_frame_source\n32-bit sync + data"]:::data --> B["nrz_encoder\n0/1 → +1/−1"]:::data
    B --> C["bpsk_subcarrier_mod\n× cos(1.024 MHz)"]:::rf
    C --> D["add_ff"]:::rf
    E["fm_voice_subcarrier_mod\nFM → 1.25 MHz"]:::rf --> F["× 0.764\n(1.68/2.2)"]:::rf
    F --> D
    D --> G["pm_mod\nexp(j · 0.133 · m(t))"]:::rf
    G --> H["Complex\nBaseband"]:::rf

    classDef data fill:#2d5016,stroke:#4a8c2a,color:#fff
    classDef rf fill:#1a3a5c,stroke:#3a7abd,color:#fff

The PCM path generates frame bits, encodes them as NRZ (+1/-1), and modulates onto a 1.024 MHz BPSK subcarrier. The optional voice path FM-modulates audio onto a 1.25 MHz subcarrier, scaled to maintain the spec power ratio (1.68 Vpp voice / 2.2 Vpp PCM). Both subcarriers are summed, then phase-modulated at 0.133 radians peak deviation to produce complex baseband.

  1. Minimal PCM transmitter

    Connect the four core blocks: frame source, NRZ encoder, BPSK modulator, and PM modulator. This produces a clean complex baseband signal with PCM telemetry only.

    from gnuradio import blocks, gr
    from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
    from apollo.constants import (
    PCM_HIGH_BIT_RATE,
    PCM_HIGH_WORDS_PER_FRAME,
    PCM_SUBCARRIER_HZ,
    PCM_WORD_LENGTH,
    SAMPLE_RATE_BASEBAND,
    )
    from apollo.nrz_encoder import nrz_encoder
    from apollo.pcm_frame_source import pcm_frame_source
    from apollo.pm_mod import pm_mod
    tb = gr.top_block()
    # Stage 1: Generate continuous PCM frame bits (0/1 byte stream)
    frame_src = pcm_frame_source(bit_rate=PCM_HIGH_BIT_RATE)
    # Stage 2: NRZ encode (byte 0/1 -> float +1/-1 at 5.12 MHz)
    nrz = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND)
    # Stage 3: BPSK modulate onto 1.024 MHz subcarrier
    bpsk = bpsk_subcarrier_mod(
    subcarrier_freq=PCM_SUBCARRIER_HZ,
    sample_rate=SAMPLE_RATE_BASEBAND,
    )
    # Stage 4: Phase modulate to complex baseband
    pm = pm_mod(pm_deviation=0.133, sample_rate=SAMPLE_RATE_BASEBAND)
    # Limit output to 10 frames worth of samples
    bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
    samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
    head = blocks.head(gr.sizeof_gr_complex, 10 * samples_per_frame)
    snk = blocks.vector_sink_c()
    tb.connect(frame_src, nrz, bpsk, pm, head, snk)
    tb.run()
    print(f"Generated {len(snk.data())} complex samples")

    Each bit at 51.2 kbps occupies 100 samples at 5.12 MHz. Each 128-word frame is 1024 bits, so one frame produces 102,400 samples (~20 ms).

  2. Add voice

    Insert an FM voice subcarrier with an internal test tone, scale it by the spec ratio, and sum with the PCM subcarrier before PM modulation.

    from gnuradio import blocks, gr
    from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
    from apollo.constants import (
    PCM_HIGH_BIT_RATE,
    SAMPLE_RATE_BASEBAND,
    VOICE_FM_DEVIATION_HZ,
    VOICE_SUBCARRIER_HZ,
    )
    from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
    from apollo.nrz_encoder import nrz_encoder
    from apollo.pcm_frame_source import pcm_frame_source
    from apollo.pm_mod import pm_mod
    tb = gr.top_block()
    # PCM path (same as before)
    frame_src = pcm_frame_source(bit_rate=PCM_HIGH_BIT_RATE)
    nrz = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND)
    bpsk = bpsk_subcarrier_mod(sample_rate=SAMPLE_RATE_BASEBAND)
    tb.connect(frame_src, nrz, bpsk)
    # Voice path: 1 kHz test tone on 1.25 MHz FM subcarrier
    voice = fm_voice_subcarrier_mod(
    sample_rate=SAMPLE_RATE_BASEBAND,
    subcarrier_freq=VOICE_SUBCARRIER_HZ,
    fm_deviation=VOICE_FM_DEVIATION_HZ,
    tone_freq=1000.0,
    )
    # Scale voice to spec ratio: 1.68 Vpp / 2.2 Vpp = 0.764
    voice_gain = blocks.multiply_const_ff(1.68 / 2.2)
    tb.connect(voice, voice_gain)
    # Sum subcarriers
    adder = blocks.add_ff(1)
    tb.connect(bpsk, (adder, 0))
    tb.connect(voice_gain, (adder, 1))
    # PM modulate
    pm = pm_mod(pm_deviation=0.133, sample_rate=SAMPLE_RATE_BASEBAND)
    head = blocks.head(gr.sizeof_gr_complex, int(SAMPLE_RATE_BASEBAND * 0.2)) # 200 ms
    snk = blocks.vector_sink_c()
    tb.connect(adder, pm, head, snk)
    tb.run()
    print(f"Generated {len(snk.data())} samples with PCM + voice")
  3. Use external audio

    Set audio_input=True on the voice modulator to accept a float stream instead of the internal tone. This is how you modulate real Apollo crew recordings onto the subcarrier.

    import numpy as np
    from gnuradio import blocks, gr
    from scipy.io import wavfile
    from scipy.signal import resample_poly
    from apollo.constants import SAMPLE_RATE_BASEBAND
    from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
    # Load and upsample audio to 5.12 MHz
    input_rate, audio = wavfile.read("crew_voice.wav")
    audio_float = audio.astype(np.float32) / 32768.0
    # Resample: input rate -> 8 kHz -> 5.12 MHz (factor 640)
    from math import gcd
    g = gcd(8000, input_rate)
    audio_8k = resample_poly(audio_float, 8000 // g, input_rate // g).astype(np.float32)
    upsampled = resample_poly(audio_8k, 640, 1).astype(np.float32)
    # Build flowgraph with external audio input
    tb = gr.top_block()
    src = blocks.vector_source_f(upsampled.tolist())
    voice_mod = fm_voice_subcarrier_mod(
    sample_rate=SAMPLE_RATE_BASEBAND,
    audio_input=True, # accepts float stream input
    )
    snk = blocks.vector_sink_f()
    tb.connect(src, voice_mod, snk)
    tb.run()
  4. The convenience wrapper

    The usb_signal_source block wires together the full TX chain internally, matching the topology of usb_downlink_receiver on the RX side.

    from gnuradio import blocks, gr
    from apollo.constants import SAMPLE_RATE_BASEBAND
    from apollo.usb_signal_source import usb_signal_source
    tb = gr.top_block()
    # Full TX chain in one block
    tx = usb_signal_source(
    sample_rate=SAMPLE_RATE_BASEBAND,
    bit_rate=51200,
    pm_deviation=0.133,
    voice_enabled=True,
    voice_tone_hz=1000.0,
    snr_db=30.0, # add noise for realistic testing
    )
    head = blocks.head(gr.sizeof_gr_complex, int(SAMPLE_RATE_BASEBAND * 0.5))
    snk = blocks.vector_sink_c()
    tb.connect(tx, head, snk)
    tb.run()
    print(f"Generated {len(snk.data())} complex samples")

    The usb_signal_source exposes the same frame_data message port as pcm_frame_source, allowing dynamic payload injection at runtime.

The pcm_frame_source (and by extension usb_signal_source) accepts a frame_data message input for injecting custom payload bytes into the next generated frame. This is useful for transmitting specific telemetry patterns, test sequences, or data from an external source.

import pmt
# Prepare a 124-byte payload (words 5-128 of the PCM frame)
payload = bytes([0xDE, 0xAD, 0xBE, 0xEF] * 31) # 124 bytes
msg = pmt.init_u8vector(len(payload), list(payload))
# Post to the frame_data port
tx.to_basic_block()._post(pmt.intern("frame_data"), msg)

The message port also accepts PDU pairs (pmt.cons(meta, payload)) — the metadata car is ignored, and the payload cdr is used as the frame data bytes.

Both usb_signal_source and the batch generate_usb_baseband() support an snr_db parameter that adds additive white Gaussian noise to the output:

tx = usb_signal_source(
snr_db=20.0, # 20 dB SNR
)

Internally this adds a noise_source_c with amplitude computed from the target SNR. The PM signal has unit power (constant envelope), so noise amplitude is sqrt(1 / (2 * 10^(snr_db/10))).

SNRTypical Use
NoneClean signal, verifying block correctness
40 dBBaseline performance measurement
30 dBRealistic strong-signal conditions
20 dBStress-testing demodulator tracking loops
10 dBThreshold testing, error-rate characterization

The most common use of the TX chain is loopback testing: generate a signal and immediately decode it. This verifies the full modulation/demodulation round-trip:

from gnuradio import blocks, gr
from apollo.constants import SAMPLE_RATE_BASEBAND
from apollo.usb_downlink_receiver import usb_downlink_receiver
from apollo.usb_signal_source import usb_signal_source
tb = gr.top_block()
tx = usb_signal_source(voice_enabled=True, snr_db=30.0)
head = blocks.head(gr.sizeof_gr_complex, int(SAMPLE_RATE_BASEBAND * 0.4))
rx = usb_downlink_receiver(output_format="raw")
snk = blocks.message_debug()
tb.connect(tx, head, rx)
tb.msg_connect(rx, "frames", snk, "store")
tb.run()
print(f"TX -> RX loopback: recovered {snk.num_messages()} frames")