Skip to content

Quick Start

This walkthrough generates a synthetic Apollo USB downlink signal, runs it through the decoder, and prints the recovered telemetry. No RF hardware or GNU Radio installation required — the pure-Python engines handle everything.

  1. Generate a synthetic signal

    The signal generator creates complex baseband samples that replicate a real Apollo USB downlink: PM-modulated carrier with a 1.024 MHz BPSK subcarrier carrying PCM telemetry.

    import numpy as np
    from apollo import generate_usb_baseband
    from apollo.constants import SAMPLE_RATE_BASEBAND
    np.random.seed(42)
    known_payload = bytes(range(124)) # deterministic test data
    signal, frame_bits = generate_usb_baseband(
    frames=5,
    frame_data=[known_payload] * 5,
    snr_db=30.0,
    )
    duration_ms = len(signal) / SAMPLE_RATE_BASEBAND * 1000
    print(f"Signal: {len(signal)} samples, {duration_ms:.1f} ms")
    print(f"Frames: {len(frame_bits)} x {len(frame_bits[0])} bits")
    Signal: 512000 samples, 100.0 ms
    Frames: 5 x 1024 bits

    That is 5 complete PCM frames at 51.2 kbps, each with a 32-bit sync word followed by 124 bytes of payload, phase-modulated onto a carrier with 30 dB SNR additive noise.

  2. Decode with the pure-Python engines

    Feed the known bit sequences through FrameSyncEngine to find frame boundaries, then DemuxEngine to extract individual telemetry words:

    from apollo import FrameSyncEngine, DemuxEngine
    sync = FrameSyncEngine()
    demux = DemuxEngine(output_format="scaled")
    for i, bits in enumerate(frame_bits):
    frames = sync.process_bits(bits)
    for frame in frames:
    result = demux.process_frame(frame["frame_bytes"], frame)
    print(f"Frame {frame['frame_id']:2d} "
    f"sync={frame['sync_confidence']}/32 "
    f"state={frame['state']:<8s} "
    f"words={len(result['words'])} "
    f"agc_channels={len(result['agc_data'])}")
    Frame 1 sync=32/32 state=VERIFY words=124 agc_channels=3
    Frame 2 sync=32/32 state=LOCKED words=124 agc_channels=3
    Frame 3 sync=32/32 state=LOCKED words=124 agc_channels=3
    Frame 4 sync=32/32 state=LOCKED words=124 agc_channels=3
    Frame 5 sync=32/32 state=LOCKED words=124 agc_channels=3

    The sync engine moves from SEARCH to VERIFY on the first frame, then locks after two consecutive hits. Each frame produces 124 data words and 3 AGC channel extractions (channels 34, 35, and 57).

  3. Inspect a decoded frame

    The demux output gives you structured access to every telemetry word:

    # Decode the last frame
    last_frame = frames[-1]
    result = demux.process_frame(last_frame["frame_bytes"], last_frame)
    # Sync word fields
    sync_fields = result["sync"]
    print(f"Sync A: 0b{sync_fields['a_bits']:05b}")
    print(f"Sync core: 0x{sync_fields['core']:04x}")
    print(f"Sync B: 0b{sync_fields['b_bits']:06b}")
    print(f"Frame ID: {sync_fields['frame_id']}")
    # First 5 data words with voltage scaling
    print("\nWord Raw Voltage")
    for w in result["words"][:5]:
    print(f" {w['position']:3d} 0x{w['raw_value']:02x} {w['voltage']:.3f} V")
    # AGC channel data
    print("\nAGC channels:")
    for agc in result["agc_data"]:
    print(f" Ch {agc['channel_octal']} (word {agc['word_position']}): "
    f"raw=0x{agc['raw_value']:02x}")

If you have GNU Radio installed, the usb_downlink_receiver hierarchical block chains the entire demod pipeline into a single block. This is the flowgraph approach shown in examples/usb_downlink_demo.py:

#!/usr/bin/env python3
"""Full GR flowgraph: generate signal -> decode -> print frames."""
import numpy as np
from gnuradio import blocks, gr
from apollo.constants import SAMPLE_RATE_BASEBAND
from apollo.usb_downlink_receiver import usb_downlink_receiver
from apollo.usb_signal_gen import generate_usb_baseband
def main():
np.random.seed(42)
payload = bytes(np.random.randint(0, 256, size=124, dtype=np.uint8))
signal, _ = generate_usb_baseband(
frames=5,
frame_data=[payload] * 5,
snr_db=30.0,
)
# Build the flowgraph
tb = gr.top_block()
src = blocks.vector_source_c(signal.tolist())
receiver = usb_downlink_receiver(output_format="scaled")
sink = blocks.message_debug()
tb.connect(src, receiver)
tb.msg_connect(receiver, "frames", sink, "store")
tb.run()
n = sink.num_messages()
print(f"Decoded {n} frame(s)")
if n > 0:
import pmt
msg = sink.get_message(0)
meta = pmt.car(msg)
fid = pmt.to_long(
pmt.dict_ref(meta, pmt.intern("frame_id"), pmt.from_long(-1))
)
conf = pmt.to_long(
pmt.dict_ref(meta, pmt.intern("sync_confidence"), pmt.from_long(-1))
)
print(f"First frame: id={fid}, sync_confidence={conf}/32")
if __name__ == "__main__":
main()

Run it:

Terminal window
uv run python examples/usb_downlink_demo.py

Here is what each stage did to the signal:

flowchart TB
    subgraph gen ["Signal Generator"]
        G1["generate_usb_baseband()"] --> G2["NRZ waveform<br/>+1/-1 per bit"]
        G2 --> G3["BPSK modulate<br/>onto 1.024 MHz"]
        G3 --> G4["PM modulate<br/>onto carrier"]
        G4 --> G5["Add AWGN<br/>@ 30 dB SNR"]
    end

    subgraph rx ["Receiver Chain"]
        R1["PM Demod<br/><em>PLL locks carrier,<br/>extracts phase</em>"] --> R2["Subcarrier Extract<br/><em>BPF 949-1099 kHz,<br/>translate to DC</em>"]
        R2 --> R3["BPSK Demod<br/><em>Costas loop resolves<br/>180-deg ambiguity</em>"]
        R3 --> R4["Frame Sync<br/><em>32-bit correlator<br/>Hamming distance ≤ 3</em>"]
        R4 --> R5["PCM Demux<br/><em>Split 128 words,<br/>identify AGC channels</em>"]
    end

    G5 --> R1
    R5 --> OUT["Telemetry PDUs"]
StageInputOutputWhat it does
PM DemodComplex IQFloatCarrier PLL tracks frequency drift, atan2(Im, Re) extracts instantaneous phase
Subcarrier ExtractFloat (composite)Complex (baseband)Bandpass filter isolates 1.024 MHz region, frequency-translating FIR shifts to DC
BPSK DemodComplex (baseband)Bytes (0/1)Costas loop recovers carrier phase, Mueller-Muller TED locks to symbol transitions, binary slicer decides bits
Frame SyncByte streamFrame PDUsSlides a 32-bit window, correlates against sync pattern (even + odd), state machine: SEARCH -> VERIFY -> LOCKED
PCM DemuxFrame PDUsWord PDUsSeparates sync (words 1-4) from data (words 5-128), applies A/D voltage scaling, extracts AGC channels 34/35/57

If you have GNU Radio installed, the streaming TX and RX blocks can be connected directly for a full round-trip test. This is the simplest way to verify the complete chain:

from gnuradio import blocks, gr
from apollo.usb_downlink_receiver import usb_downlink_receiver
from apollo.usb_signal_source import usb_signal_source
tb = gr.top_block()
tx = usb_signal_source(voice_enabled=True, snr_db=30.0)
head = blocks.head(gr.sizeof_gr_complex, 10 * 102400) # 10 frames
rx = usb_downlink_receiver(output_format="raw")
snk = blocks.message_debug()
tb.connect(tx, head, rx)
tb.msg_connect(rx, "frames", snk, "store")
tb.run()
print(f"Recovered {snk.num_messages()} frames")

The loopback_demo.py script wraps this pattern with argument parsing and frame analysis. Run it with:

Terminal window
uv run python examples/loopback_demo.py --frames 20 --voice