Quick Start
This walkthrough generates a synthetic Apollo USB downlink signal, runs it through the decoder, and prints the recovered telemetry. No RF hardware or GNU Radio installation required — the pure-Python engines handle everything.
Your first decode
Section titled “Your first decode”-
Generate a synthetic signal
The signal generator creates complex baseband samples that replicate a real Apollo USB downlink: PM-modulated carrier with a 1.024 MHz BPSK subcarrier carrying PCM telemetry.
import numpy as npfrom apollo import generate_usb_basebandfrom apollo.constants import SAMPLE_RATE_BASEBANDnp.random.seed(42)known_payload = bytes(range(124)) # deterministic test datasignal, frame_bits = generate_usb_baseband(frames=5,frame_data=[known_payload] * 5,snr_db=30.0,)duration_ms = len(signal) / SAMPLE_RATE_BASEBAND * 1000print(f"Signal: {len(signal)} samples, {duration_ms:.1f} ms")print(f"Frames: {len(frame_bits)} x {len(frame_bits[0])} bits")Signal: 512000 samples, 100.0 msFrames: 5 x 1024 bitsThat is 5 complete PCM frames at 51.2 kbps, each with a 32-bit sync word followed by 124 bytes of payload, phase-modulated onto a carrier with 30 dB SNR additive noise.
-
Decode with the pure-Python engines
Feed the known bit sequences through
FrameSyncEngineto find frame boundaries, thenDemuxEngineto extract individual telemetry words:from apollo import FrameSyncEngine, DemuxEnginesync = FrameSyncEngine()demux = DemuxEngine(output_format="scaled")for i, bits in enumerate(frame_bits):frames = sync.process_bits(bits)for frame in frames:result = demux.process_frame(frame["frame_bytes"], frame)print(f"Frame {frame['frame_id']:2d} "f"sync={frame['sync_confidence']}/32 "f"state={frame['state']:<8s} "f"words={len(result['words'])} "f"agc_channels={len(result['agc_data'])}")Frame 1 sync=32/32 state=VERIFY words=124 agc_channels=3Frame 2 sync=32/32 state=LOCKED words=124 agc_channels=3Frame 3 sync=32/32 state=LOCKED words=124 agc_channels=3Frame 4 sync=32/32 state=LOCKED words=124 agc_channels=3Frame 5 sync=32/32 state=LOCKED words=124 agc_channels=3The sync engine moves from SEARCH to VERIFY on the first frame, then locks after two consecutive hits. Each frame produces 124 data words and 3 AGC channel extractions (channels 34, 35, and 57).
-
Inspect a decoded frame
The demux output gives you structured access to every telemetry word:
# Decode the last framelast_frame = frames[-1]result = demux.process_frame(last_frame["frame_bytes"], last_frame)# Sync word fieldssync_fields = result["sync"]print(f"Sync A: 0b{sync_fields['a_bits']:05b}")print(f"Sync core: 0x{sync_fields['core']:04x}")print(f"Sync B: 0b{sync_fields['b_bits']:06b}")print(f"Frame ID: {sync_fields['frame_id']}")# First 5 data words with voltage scalingprint("\nWord Raw Voltage")for w in result["words"][:5]:print(f" {w['position']:3d} 0x{w['raw_value']:02x} {w['voltage']:.3f} V")# AGC channel dataprint("\nAGC channels:")for agc in result["agc_data"]:print(f" Ch {agc['channel_octal']} (word {agc['word_position']}): "f"raw=0x{agc['raw_value']:02x}")
Full-chain decode with GNU Radio
Section titled “Full-chain decode with GNU Radio”If you have GNU Radio installed, the usb_downlink_receiver hierarchical block chains the entire demod pipeline into a single block. This is the flowgraph approach shown in examples/usb_downlink_demo.py:
#!/usr/bin/env python3"""Full GR flowgraph: generate signal -> decode -> print frames."""import numpy as npfrom gnuradio import blocks, gr
from apollo.constants import SAMPLE_RATE_BASEBANDfrom apollo.usb_downlink_receiver import usb_downlink_receiverfrom apollo.usb_signal_gen import generate_usb_baseband
def main(): np.random.seed(42) payload = bytes(np.random.randint(0, 256, size=124, dtype=np.uint8))
signal, _ = generate_usb_baseband( frames=5, frame_data=[payload] * 5, snr_db=30.0, )
# Build the flowgraph tb = gr.top_block() src = blocks.vector_source_c(signal.tolist()) receiver = usb_downlink_receiver(output_format="scaled") sink = blocks.message_debug()
tb.connect(src, receiver) tb.msg_connect(receiver, "frames", sink, "store")
tb.run()
n = sink.num_messages() print(f"Decoded {n} frame(s)")
if n > 0: import pmt msg = sink.get_message(0) meta = pmt.car(msg) fid = pmt.to_long( pmt.dict_ref(meta, pmt.intern("frame_id"), pmt.from_long(-1)) ) conf = pmt.to_long( pmt.dict_ref(meta, pmt.intern("sync_confidence"), pmt.from_long(-1)) ) print(f"First frame: id={fid}, sync_confidence={conf}/32")
if __name__ == "__main__": main()Run it:
uv run python examples/usb_downlink_demo.pyWhat just happened
Section titled “What just happened”Here is what each stage did to the signal:
flowchart TB
subgraph gen ["Signal Generator"]
G1["generate_usb_baseband()"] --> G2["NRZ waveform<br/>+1/-1 per bit"]
G2 --> G3["BPSK modulate<br/>onto 1.024 MHz"]
G3 --> G4["PM modulate<br/>onto carrier"]
G4 --> G5["Add AWGN<br/>@ 30 dB SNR"]
end
subgraph rx ["Receiver Chain"]
R1["PM Demod<br/><em>PLL locks carrier,<br/>extracts phase</em>"] --> R2["Subcarrier Extract<br/><em>BPF 949-1099 kHz,<br/>translate to DC</em>"]
R2 --> R3["BPSK Demod<br/><em>Costas loop resolves<br/>180-deg ambiguity</em>"]
R3 --> R4["Frame Sync<br/><em>32-bit correlator<br/>Hamming distance ≤ 3</em>"]
R4 --> R5["PCM Demux<br/><em>Split 128 words,<br/>identify AGC channels</em>"]
end
G5 --> R1
R5 --> OUT["Telemetry PDUs"]
| Stage | Input | Output | What it does |
|---|---|---|---|
| PM Demod | Complex IQ | Float | Carrier PLL tracks frequency drift, atan2(Im, Re) extracts instantaneous phase |
| Subcarrier Extract | Float (composite) | Complex (baseband) | Bandpass filter isolates 1.024 MHz region, frequency-translating FIR shifts to DC |
| BPSK Demod | Complex (baseband) | Bytes (0/1) | Costas loop recovers carrier phase, Mueller-Muller TED locks to symbol transitions, binary slicer decides bits |
| Frame Sync | Byte stream | Frame PDUs | Slides a 32-bit window, correlates against sync pattern (even + odd), state machine: SEARCH -> VERIFY -> LOCKED |
| PCM Demux | Frame PDUs | Word PDUs | Separates sync (words 1-4) from data (words 5-128), applies A/D voltage scaling, extracts AGC channels 34/35/57 |
Streaming loopback
Section titled “Streaming loopback”If you have GNU Radio installed, the streaming TX and RX blocks can be connected directly for a full round-trip test. This is the simplest way to verify the complete chain:
from gnuradio import blocks, gr
from apollo.usb_downlink_receiver import usb_downlink_receiverfrom apollo.usb_signal_source import usb_signal_source
tb = gr.top_block()
tx = usb_signal_source(voice_enabled=True, snr_db=30.0)head = blocks.head(gr.sizeof_gr_complex, 10 * 102400) # 10 framesrx = usb_downlink_receiver(output_format="raw")snk = blocks.message_debug()
tb.connect(tx, head, rx)tb.msg_connect(rx, "frames", snk, "store")tb.run()
print(f"Recovered {snk.num_messages()} frames")The loopback_demo.py script wraps this pattern with argument parsing and frame analysis. Run it with:
uv run python examples/loopback_demo.py --frames 20 --voice