Every component in gr-apollo falls into one of two categories: GNU Radio blocks that require the gnuradio runtime, and pure-Python engines that work standalone. The engines power the GR blocks internally, but you can also use them directly for testing, scripting, or integration without GNU Radio.
Note
Blocks that require GNU Radio are imported lazily. If gnuradio is not installed, the pure-Python engines (FrameSyncEngine, DemuxEngine, DownlinkEngine, AGCBridgeClient, UplinkEncoder, UplinkSerializerEngine, UplinkDeserializerEngine, RangingCodeGenerator, RangingCorrelator, generate_usb_baseband) remain available.
Module: apollo.usb_signal_gen
Type: Pure-Python function (numpy)
Purpose: Generate a synthetic Apollo USB downlink complex baseband signal for testing.
from apollo.usb_signal_gen import generate_usb_baseband
signal, frame_bits = generate_usb_baseband ( frames = 10 , snr_db = 20.0 )
def generate_usb_baseband (
bit_rate : float = 51_200 ,
sample_rate : float = 5_120_000 ,
pm_deviation : float = 0.133 ,
voice_enabled : bool = False ,
voice_tone_hz : float = 1000.0 ,
snr_db : float | None = None ,
frame_data : list[ bytes ] | None = None ,
) -> tuple[np.ndarray, list[list[ int ]]]
Parameter Type Default Description framesint1Number of PCM frames to generate bit_ratefloat51200PCM bit rate in bps. Use 51200 for high rate, 1600 for low rate sample_ratefloat5120000Output sample rate in Hz pm_deviationfloat0.133Peak PM deviation in radians (7.6 degrees) voice_enabledboolFalseInclude 1.25 MHz FM voice subcarrier with test tone voice_tone_hzfloat1000.0Audio frequency of the voice test tone in Hz snr_dbfloat | NoneNoneIf set, add AWGN at this signal-to-noise ratio (dB). None means no noise frame_datalist[bytes] | NoneNoneOptional per-frame payload bytes. None generates random data
A tuple of (signal, frame_bits):
signal: np.ndarray of complex64 — the complex baseband IQ samples
frame_bits: list[list[int]] — the transmitted bit sequences per frame (for verification)
The module also exposes the individual stages of signal generation:
Function Signature Returns generate_pcm_frame(frame_id, odd, data, words_per_frame) -> list[int]Bit list (length = words_per_frame * 8) generate_nrz_waveform(bits, bit_rate, sample_rate) -> np.ndarrayFloat32 NRZ samples (+1.0 / -1.0) generate_bpsk_subcarrier(nrz_data, subcarrier_freq, sample_rate) -> np.ndarrayFloat32 BPSK subcarrier samples generate_fm_voice_subcarrier(n_samples, sample_rate, tone_freq, subcarrier_freq, fm_deviation) -> np.ndarrayFloat32 FM subcarrier samples
Module: apollo.pm_demod
Type: gr.hier_block2
Purpose: Extract phase modulation from complex baseband using a carrier-tracking PLL.
from apollo.pm_demod import pm_demod
blk = pm_demod ( carrier_pll_bw = 0.02 , sample_rate = 5_120_000 )
Port Direction Type Description in0Input complexComplex baseband IQ samples out0Output floatDemodulated composite signal containing all subcarriers
Parameter Type Default Description carrier_pll_bwfloat0.02PLL loop bandwidth in rad/sample. Narrower tracks better, wider acquires faster sample_ratefloat5120000Input sample rate in Hz
Method Signature Description get_carrier_pll_bw() -> floatRead current PLL loop bandwidth set_carrier_pll_bw(bw: float) -> NoneUpdate PLL loop bandwidth at runtime
input -> pll_carriertracking_cc -> complex_to_arg -> output
Module: apollo.subcarrier_extract
Type: gr.hier_block2
Purpose: Bandpass-filter and frequency-translate a subcarrier to complex baseband.
from apollo.subcarrier_extract import subcarrier_extract
blk = subcarrier_extract ( center_freq = 1_024_000 , bandwidth = 150_000 )
Port Direction Type Description in0Input floatPM demodulator output (composite subcarrier signal) out0Output complexBaseband subcarrier signal (decimated)
Parameter Type Default Description center_freqfloat1024000Subcarrier center frequency in Hz bandwidthfloat150000Passband bandwidth in Hz. Transition band is 20% of this value sample_ratefloat5120000Input sample rate in Hz decimationint1Output decimation factor
Property Type Description output_sample_ratefloatEffective output rate: sample_rate / decimation
input -> float_to_complex -> freq_xlating_fir_filter_ccc -> output
The filter taps are designed as a Hamming-windowed lowpass at bandwidth / 2 cutoff with 20% transition width.
Module: apollo.bpsk_demod
Type: gr.hier_block2
Purpose: Recover NRZ bits from a BPSK baseband signal using Costas loop carrier recovery and Mueller-Muller symbol sync.
from apollo.bpsk_demod import bpsk_demod
blk = bpsk_demod ( symbol_rate = 51_200 , sample_rate = 5_120_000 )
Port Direction Type Description in0Input complexBaseband BPSK signal (from subcarrier_extract) out0Output byteRecovered NRZ bits (0 or 1), one per symbol period
Parameter Type Default Description symbol_ratefloat51200Symbol (bit) rate in Hz sample_ratefloat5120000Input sample rate in Hz loop_bwfloat0.045Costas loop bandwidth in rad/sample
input -> costas_loop_cc(order=2) -> symbol_sync_cc(TED_MUELLER_AND_MULLER) -> complex_to_real -> binary_slicer_fb -> output
Module: apollo.bpsk_subcarrier_demod
Type: gr.hier_block2
Purpose: Convenience wrapper combining subcarrier_extract and bpsk_demod for the 1.024 MHz PCM subcarrier.
from apollo.bpsk_subcarrier_demod import bpsk_subcarrier_demod
blk = bpsk_subcarrier_demod ( sample_rate = 5_120_000 )
Port Direction Type Description in0Input floatPM demodulator output (composite subcarrier signal) out0Output byteRecovered NRZ bits (0 or 1)
Parameter Type Default Description subcarrier_freqfloat1024000PCM subcarrier frequency in Hz (PCM_SUBCARRIER_HZ) bandwidthfloat150000Bandpass filter bandwidth in Hz (PCM_BPF_BW_HZ) bit_ratefloat51200PCM bit rate in Hz (PCM_HIGH_BIT_RATE) sample_ratefloat5120000Input sample rate in Hz decimationint1Subcarrier extraction decimation factor loop_bwfloat0.045BPSK Costas loop bandwidth
input -> subcarrier_extract -> bpsk_demod -> output
Module: apollo.pcm_frame_sync
Type: Pure-Python class
Purpose: Acquire and track the 32-bit PCM sync pattern from an NRZ bit stream using a three-state machine (SEARCH, VERIFY, LOCKED).
from apollo.pcm_frame_sync import FrameSyncEngine
engine = FrameSyncEngine ( bit_rate = 51_200 , max_bit_errors = 3 )
frames = engine. process_bits ( [ 1 , 0 , 1 , 1 , ... ] )
Parameter Type Default Description bit_rateint51200PCM bit rate: 51200 (high) or 1600 (low). Determines words per frame max_bit_errorsint3Maximum Hamming distance for a sync pattern match verify_countint2Consecutive frame-boundary hits to transition VERIFY to LOCKED miss_limitint3Consecutive frame-boundary misses to drop LOCKED to SEARCH a_bitsint0b101015-bit patchboard-selectable A field coreint0b11100110101110015-bit fixed core (even-frame value) b_bitsint0b1101006-bit patchboard-selectable B field
Method Signature Description process_bits(bits: list[int]) -> list[dict]Feed bits into the engine. Returns list of completed frame dicts reset() -> NoneReset to SEARCH state, clear all buffers
Property Type Description state_namestrCurrent state: "SEARCH", "VERIFY", or "LOCKED" stateintNumeric state: 0 (SEARCH), 1 (VERIFY), 2 (LOCKED) bits_per_frameintTotal bits per frame (1024 for high rate, 1600 for low rate) words_per_frameintWords per frame (128 or 200)
Each frame returned by process_bits contains:
Key Type Description frame_idintFrame number within subframe (1-50) odd_frameboolWhether the frame has a complemented sync core sync_confidenceintNumber of correct bits in the 32-bit sync word (32 - hamming_distance) timestampfloattime.time() when the frame was emittedstatestrEngine state name when frame was emitted frame_bytesbytesComplete frame packed as bytes frame_bitslist[int]Complete frame as bit list
Module: apollo.pcm_frame_sync
Type: gr.basic_block
Purpose: GNU Radio wrapper around FrameSyncEngine. Consumes byte stream, emits PDU messages.
from apollo.pcm_frame_sync import pcm_frame_sync
blk = pcm_frame_sync ( bit_rate = 51_200 , max_bit_errors = 3 )
Port Direction Type Description in0Input byte (streaming)NRZ bit stream from BPSK demodulator "frames"Output Message (PDU) Complete frame PDUs
Parameter Type Default Description bit_rateint51200PCM bit rate: 51200 or 1600 max_bit_errorsint3Maximum Hamming distance for sync match
Each PDU is a pmt.cons(metadata, payload):
Metadata dict:
Key PMT Type Description frame_idpmt.from_longFrame number (1-50) odd_framepmt.from_boolComplemented sync core sync_confidencepmt.from_longCorrect sync bits (out of 32) timestamppmt.from_doubleEmission timestamp
Payload: pmt.init_u8vector containing frame bytes.
Module: apollo.pcm_demux
Type: Pure-Python class
Purpose: Demultiplex PCM frames into individual telemetry words, applying A/D voltage scaling and identifying AGC downlink channels.
from apollo.pcm_demux import DemuxEngine
engine = DemuxEngine ( output_format = " scaled " )
result = engine. process_frame ( frame_bytes , meta = { " frame_id " : 1 } )
Parameter Type Default Description output_formatstr"raw"One of "raw" (8-bit integers), "scaled" (voltage-converted), "engineering" (future: named fields with units) words_per_frameint128Frame length: 128 (high rate) or 200 (low rate)
Method Signature Description process_frame(frame_bytes: bytes, meta: dict | None) -> dictDemultiplex a complete frame extract_word(frame_bytes: bytes, word_position: int) -> dictExtract a single word by 1-indexed position (1-128 or 1-200)
Key Type Description syncdictParsed sync word fields: a_bits, core, b_bits, frame_id, word wordslist[dict]Per-word dicts with position (1-indexed), raw_value, and optionally voltage, voltage_low_level agc_datalist[dict]AGC channel entries with channel, channel_octal, raw_value, word_position, optionally voltage raw_framebytesOriginal frame bytes metadictPass-through metadata from frame sync
AGC Channel Octal Word Position (1-indexed) Description 28 034 34 DNTM1 — telemetry word high byte 29 035 35 DNTM2 — telemetry word low byte 47 057 57 OUTLINK — digital downlink data
Module: apollo.pcm_demux
Type: gr.basic_block
Purpose: GNU Radio message-port wrapper. Receives frame PDUs, emits demultiplexed data on three output ports.
from apollo.pcm_demux import pcm_demux
blk = pcm_demux ( output_format = " scaled " )
Port Direction Type Description "frames"Input Message (PDU) Complete frame PDUs from pcm_frame_sync "telemetry"Output Message (PDU) Individual word PDUs with channel metadata "agc_data"Output Message (PDU) AGC channel data (ch 34/35/57) "raw_frame"Output Message (PDU) Full frame passthrough
No streaming I/O. This is a message-only block.
Parameter Type Default Description output_formatstr"raw""raw", "scaled", or "engineering"words_per_frameint128128 (high rate) or 200 (low rate)
telemetry port metadata:
Key PMT Type Description positionpmt.from_long1-indexed word number raw_valuepmt.from_long8-bit raw value voltagepmt.from_doubleScaled voltage (if format is "scaled")
agc_data port metadata:
Key PMT Type Description channelpmt.from_longAGC channel number (decimal) word_positionpmt.from_long1-indexed frame word number raw_valuepmt.from_long8-bit raw value
Module: apollo.downlink_decoder
Type: Pure-Python class
Purpose: Reassemble 15-bit AGC words from DNTM1/DNTM2 channel pairs and identify downlink list types.
from apollo.downlink_decoder import DownlinkEngine
engine = DownlinkEngine ( buffer_size = 400 )
snapshot = engine. feed_agc_word ( channel = 28 , raw_value =0x 3F )
Parameter Type Default Description buffer_sizeint400Number of 15-bit words per downlink buffer before finalization
Method Signature Description feed_agc_word(channel: int, raw_value: int) -> dict | NoneProcess one AGC channel byte. Returns a snapshot dict when a buffer fills, else None force_flush() -> dict | NoneForce-finalize the current buffer (for end-of-data). Returns snapshot or None if empty reset() -> NoneClear all internal state
Channel Decimal Action DNTM1 28 Stores as pending high byte, waits for matching DNTM2 DNTM2 29 Combines with pending DNTM1 into a 15-bit word via reassemble_agc_word OUTLINK 47 Appended to separate outlink buffer Other — Ignored, returns None
Key Type Description list_type_idintDownlink list type ID (lower 4 bits of first word) list_namestrHuman-readable list name or "Unknown (ID=N)" word_countintNumber of 15-bit words in this snapshot wordslist[int]The 15-bit AGC words outlink_datalist[int]Accumulated OUTLINK channel bytes
ID Constant Name 0 DL_CM_POWERED_LISTCM Powered Flight 1 DL_LM_ORBITAL_MANEUVERSLM Orbital Maneuvers 2 DL_CM_COAST_ALIGNCM Coast/Alignment 3 DL_LM_COAST_ALIGNLM Coast/Alignment 7 DL_LM_DESCENT_ASCENTLM Descent/Ascent 8 DL_LM_LUNAR_SURFACE_ALIGNLM Lunar Surface Alignment 9 DL_CM_ENTRY_UPDATECM Entry Update
Function Signature Description reassemble_agc_word(dntm1_byte: int, dntm2_byte: int) -> intCombine two 8-bit channel bytes into one 15-bit AGC word. DNTM1 provides bits 14-8 (lower 7 bits of byte), DNTM2 provides bits 7-0 identify_list_type(first_word: int) -> tuple[int, str]Extract list type ID from lower 4 bits of first word. Returns (id, name)
Module: apollo.downlink_decoder
Type: gr.basic_block
Purpose: GNU Radio message-port wrapper. Receives AGC data PDUs from pcm_demux, emits decoded downlink snapshots.
from apollo.downlink_decoder import downlink_decoder
blk = downlink_decoder ( buffer_size = 400 )
Port Direction Type Description "agc_data"Input Message (PDU) AGC channel data from pcm_demux "downlink"Output Message (PDU) Decoded downlink list snapshots
No streaming I/O.
Parameter Type Default Description buffer_sizeint400Words per downlink buffer
Metadata dict:
Key PMT Type Description list_type_idpmt.from_longDownlink list type ID list_namepmt.internHuman-readable list name word_countpmt.from_longNumber of 15-bit words
Payload: pmt.init_u8vector containing 15-bit words packed as big-endian byte pairs (2 bytes per word).
Module: apollo.voice_subcarrier_demod
Type: gr.hier_block2
Purpose: Extract and demodulate the 1.25 MHz FM voice subcarrier to 300-3000 Hz audio.
from apollo.voice_subcarrier_demod import voice_subcarrier_demod
blk = voice_subcarrier_demod ( sample_rate = 5_120_000 , audio_rate = 8000 )
Port Direction Type Description in0Input floatPM demodulator output (composite subcarrier signal) out0Output floatDemodulated audio at audio_rate Hz
Parameter Type Default Description sample_ratefloat5120000Input sample rate in Hz audio_rateint8000Target output audio sample rate in Hz
Property Type Description output_sample_ratefloatActual output sample rate (equals audio_rate) extracted_ratefloatIntermediate rate after subcarrier extraction
input -> subcarrier_extract(1.25 MHz, BW=58 kHz, decimation=auto)
-> quadrature_demod_cf(gain)
-> band_pass_filter(300-3000 Hz)
-> rational_resampler_fff -> output
The decimation factor is computed automatically: int(sample_rate / (58000 * 2.2)). At the default 5.12 MHz input rate, this yields decimation=40 and an intermediate rate of 128 kHz. The FM discriminator gain is extracted_rate / (2 * pi * 29000).
Module: apollo.sco_demod
Type: gr.hier_block2
Purpose: Demodulate one of the 9 Subcarrier Oscillator channels to recover a 0-5V analog sensor reading. Valid in FM downlink mode only.
from apollo.sco_demod import sco_demod
blk = sco_demod ( sco_number = 5 , sample_rate = 5_120_000 )
Port Direction Type Description in0Input floatPM demodulator output (composite subcarrier signal) out0Output floatRecovered sensor voltage (0.0 to 5.0 V)
Parameter Type Default Description sco_numberint1SCO channel number (1-9). See constants reference for frequencies sample_ratefloat5120000Input sample rate in Hz
Property Type Description center_freqfloatCenter frequency of the selected SCO channel (Hz) deviation_hzfloatFM deviation in Hz (7.5% of center frequency) output_sample_ratefloatSample rate of the output stream
input -> subcarrier_extract(sco_freq, BW=15% of center, decimation=auto)
-> quadrature_demod_cf(gain)
-> multiply_const_ff(2.5)
-> add_const_ff(2.5) -> output
The voltage mapping is linear: demod output of -1.0 maps to 0V, 0.0 maps to 2.5V, +1.0 maps to 5V.
SCO Number Center Frequency Deviation (+/-) Bandwidth (15%) 1 14,500 Hz 1,087.5 Hz 2,175 Hz 2 22,000 Hz 1,650 Hz 3,300 Hz 3 30,000 Hz 2,250 Hz 4,500 Hz 4 40,000 Hz 3,000 Hz 6,000 Hz 5 52,500 Hz 3,937.5 Hz 7,875 Hz 6 70,000 Hz 5,250 Hz 10,500 Hz 7 95,000 Hz 7,125 Hz 14,250 Hz 8 125,000 Hz 9,375 Hz 18,750 Hz 9 165,000 Hz 12,375 Hz 24,750 Hz
Module: apollo.fm_demod
Type: gr.hier_block2
Purpose: Extract frequency modulation from complex baseband using a carrier-tracking PLL and quadrature demodulation.
from apollo.fm_demod import fm_demod
blk = fm_demod ( carrier_pll_bw = 0.02 , fm_deviation_hz = 500_000 )
Port Direction Type Description in0Input complexComplex baseband IQ samples out0Output floatDemodulated composite signal (normalized: +/-1.0 at full deviation)
Parameter Type Default Description carrier_pll_bwfloat0.02PLL loop bandwidth in rad/sample fm_deviation_hzfloat500000Expected max FM deviation in Hz (sets demod gain) sample_ratefloat5120000Input sample rate in Hz
Method Signature Description get_carrier_pll_bw() -> floatRead current PLL loop bandwidth set_carrier_pll_bw(bw: float) -> NoneUpdate PLL loop bandwidth at runtime get_fm_deviation() -> floatRead configured FM deviation
input -> pll_carriertracking_cc -> quadrature_demod_cf(gain) -> output
where gain = sample_rate / (2*pi*fm_deviation_hz).
Module: apollo.agc_bridge
Type: Pure-Python class (threaded TCP client)
Purpose: Connect to a running Virtual AGC (yaAGC) emulator over TCP, receive telemetry packets, and send uplink commands.
from apollo.agc_bridge import AGCBridgeClient
def on_packet ( channel , value ) :
print ( f "Ch {channel} : {value} " )
client = AGCBridgeClient ( host = " localhost " , port = 19697 , on_packet = on_packet )
client. send ( channel = 37 , value =0x 1234 )
Parameter Type Default Description hoststr"localhost"yaAGC hostname or IP address portint19697yaAGC TCP port channel_filterfrozenset[int] | NoneAGC_TELECOM_CHANNELSSet of channel numbers to pass through. None accepts all channels on_packetCallable[[int, int], None] | NoneNoneCallback for received packets: on_packet(channel, value). Called from the rx thread on_statusCallable[[str], None] | NoneNoneCallback for connection state changes: on_status(state_str). States: "disconnected", "connecting", "connected"
Method Signature Description start() -> NoneLaunch background receive thread. Auto-reconnects on disconnect with exponential backoff (0.5s to 30s) stop() -> NoneSignal thread to stop, close socket, join thread (5s timeout) send(channel: int, value: int) -> boolSend a 4-byte I/O packet to yaAGC. Returns True on success, False if not connected
Property Type Description statestrCurrent connection state: "disconnected", "connecting", or "connected" connectedboolTrue if state is "connected"
Parameter Value Base delay 0.5 seconds Max delay 30.0 seconds Backoff factor 2.0x
Module: apollo.agc_bridge
Type: gr.basic_block
Purpose: GNU Radio wrapper bridging PDU message ports to a Virtual AGC TCP connection.
from apollo.agc_bridge import agc_bridge
blk = agc_bridge ( host = " localhost " , port = 19697 )
Port Direction Type Description "uplink_data"Input Message (PDU) pmt.cons(meta, pmt.cons(channel, value)) to send to AGC"downlink_data"Output Message (PDU) Received AGC packets as pmt.cons(meta_dict, pmt.cons(channel, value)) "status"Output Message pmt.intern(state_str) on connection state changes
No streaming I/O.
Parameter Type Default Description hoststr"localhost"yaAGC hostname portint19697yaAGC TCP port
The block calls client.start() on GR start() and client.stop() on GR stop().
Module: apollo.uplink_encoder
Type: Pure-Python class
Purpose: Encode ground-station commands (VERB, NOUN, DATA, PROCEED) into AGC INLINK channel (045 octal) word sequences.
from apollo.uplink_encoder import UplinkEncoder
pairs = enc. encode_verb_noun ( verb = 37 , noun = 1 )
# Returns: [(37, 17408), (37, 1024), (37, 7168), ...]
Parameter Type Default Description channelint37AGC I/O channel for uplink. Default is channel 045 octal (37 decimal, INLINK)
Method Signature Description encode_keycode(keycode: int) -> tuple[int, int]Encode a single DSKY keycode. Returns (channel, value) with keycode in bits 14-10 encode_digit(digit: int) -> tuple[int, int]Encode a decimal digit 0-9 encode_verb(verb_number: int) -> list[tuple[int, int]]Encode VERB + 2 digit keys. verb_number range: 0-99 encode_noun(noun_number: int) -> list[tuple[int, int]]Encode NOUN + 2 digit keys. noun_number range: 0-99 encode_data(value: int, signed: bool = True) -> list[tuple[int, int]]Encode a 5-digit data entry with optional sign. Range: -99999 to +99999 (signed) or 0 to 99999 (unsigned) encode_proceed() -> list[tuple[int, int]]Encode a PROCEED (ENTER) keystroke encode_verb_noun(verb: int, noun: int) -> list[tuple[int, int]]Convenience: full VERB + NOUN + ENTER sequence encode_command(command_type: str, data: int | None) -> list[tuple[int, int]]Dispatch by type string: "VERB", "NOUN", "DATA", "PROCEED"
Key Octal Decimal Constant VERB 021 17 KEYCODE_VERBNOUN 037 31 KEYCODE_NOUNENTER/PROCEED 034 28 KEYCODE_ENTERRESET/KEY RELEASE 022 18 KEYCODE_RESETCLEAR 036 30 KEYCODE_CLEAR+ 032 26 KEYCODE_PLUS- 033 27 KEYCODE_MINUS0 020 16 KEYCODE_DIGITS[0]1-7 001-007 1-7 KEYCODE_DIGITS[1-7]8 010 8 KEYCODE_DIGITS[8]9 011 9 KEYCODE_DIGITS[9]
Module: apollo.uplink_encoder
Type: gr.basic_block
Purpose: GNU Radio message-port wrapper. Receives command PDUs, emits individual uplink word PDUs.
from apollo.uplink_encoder import uplink_encoder
blk = uplink_encoder ( channel = 37 )
Port Direction Type Description "command"Input Message (PDU) Command PDU with metadata dict "uplink_words"Output Message (PDU) One PDU per (channel, value) pair
No streaming I/O.
Parameter Type Default Description channelint37AGC INLINK channel (045 octal)
The input PDU metadata dict must contain:
Key PMT Type Required Description "type"pmt.internYes "VERB", "NOUN", "DATA", or "PROCEED""data"pmt.from_longFor VERB/NOUN/DATA Verb number, noun number, or data value
Each output PDU contains:
Metadata:
Key PMT Type Description "channel"pmt.from_longAGC channel number "value"pmt.from_long15-bit value
Payload: pmt.cons(pmt.from_long(channel), pmt.from_long(value))
Module: apollo.usb_downlink_receiver
Type: gr.hier_block2
Purpose: Complete Apollo USB downlink receiver chain in a single block — complex baseband input to telemetry PDU outputs.
from apollo.usb_downlink_receiver import usb_downlink_receiver
blk = usb_downlink_receiver ( sample_rate = 5_120_000 , output_format = " scaled " )
Port Direction Type Description in0Input complex (streaming)Baseband IQ samples at sample_rate "frames"Output Message (PDU) Complete PCM frame PDUs (from frame sync) "telemetry"Output Message (PDU) Individual word PDUs with channel metadata "agc_data"Output Message (PDU) AGC channel data (ch 34/35/57) "raw_frame"Output Message (PDU) Full frame passthrough
Parameter Type Default Description sample_ratefloat5120000Input sample rate in Hz bit_rateint51200PCM bit rate: 51200 (high) or 1600 (low) carrier_pll_bwfloat0.02PM demodulator PLL bandwidth (rad/sample) subcarrier_bwfloat150000PCM subcarrier bandpass filter width (Hz) bpsk_loop_bwfloat0.045BPSK Costas loop bandwidth (rad/sample) max_bit_errorsint3Frame sync Hamming distance threshold output_formatstr"raw"Demux output: "raw", "scaled", or "engineering"
complex in -> pm_demod -> subcarrier_extract(1.024 MHz)
-> bpsk_demod -> pcm_frame_sync
"telemetry" "agc_data" "raw_frame"
The internal blocks are exposed as instance attributes for runtime inspection or parameter adjustment:
Attribute Type Block self.pmpm_demodPM demodulator self.sc_extractsubcarrier_extractSubcarrier extractor self.bpskbpsk_demodBPSK demodulator self.frame_syncpcm_frame_syncFrame synchronizer self.demuxpcm_demuxFrame demultiplexer
Module: apollo.pcm_frame_source
Type: Pure-Python class
Purpose: Generate PCM telemetry frames as bit lists. Maintains rolling frame counter (1-50), auto-complements sync core on odd frames.
from apollo.pcm_frame_source import FrameSourceEngine
engine = FrameSourceEngine ( bit_rate = 51200 )
bits = engine. next_frame ()
Parameter Type Default Description bit_rateint51200PCM bit rate: 51200 (high, 128 words/frame) or 1600 (low, 200 words/frame)
Method Signature Description next_frame(data: bytes | None) -> list[int]Generate next frame as bit list. data=None gives zero-fill. Returns list of 0/1 values, length = words_per_frame * 8
Property Type Description bit_rateintPCM bit rate in bps words_per_frameint128 (high) or 200 (low) frame_counterintCurrent frame number (1-50)
Module: apollo.pcm_frame_source
Type: gr.sync_block
Purpose: GNU Radio source block producing continuous PCM frame bit stream. Outputs bytes (0 or 1). Accepts frame_data message input for dynamic payload injection.
from apollo.pcm_frame_source import pcm_frame_source
blk = pcm_frame_source ( bit_rate = 51200 )
Port Direction Type Description (none) Input (none) Source block — no streaming input out0Output byteNRZ bit stream (values 0 or 1) "frame_data"Input Message PMT u8vector or PDU for dynamic payload
Parameter Type Default Description bit_rateint51200PCM bit rate: 51200 (high, 128 words/frame) or 1600 (low, 200 words/frame)
Uses FrameSourceEngine internally with a deque bit buffer to bridge frame-granularity generation and sample-granularity scheduling.
Module: apollo.nrz_encoder
Type: gr.hier_block2
Purpose: Convert PCM bit stream (byte 0/1) to NRZ float waveform (+1.0/-1.0) at the output sample rate.
from apollo.nrz_encoder import nrz_encoder
blk = nrz_encoder ( bit_rate = 51200 , sample_rate = 5_120_000 )
Port Direction Type Description in0Input byteBit stream from pcm_frame_source (values 0 or 1) out0Output floatNRZ waveform (+1.0 / -1.0) at sample_rate
Parameter Type Default Description bit_rateint51200PCM bit rate in bps sample_ratefloat5120000Output sample rate in Hz
Property Type Description samples_per_bitintSamples per bit period: sample_rate / bit_rate
input -> char_to_float -> multiply_const_ff(2.0) -> add_const_ff(-1.0) -> repeat(samples_per_bit) -> output
Module: apollo.bpsk_subcarrier_mod
Type: gr.hier_block2
Purpose: Modulate NRZ float data onto 1.024 MHz cosine subcarrier via multiplication: output(t) = nrz(t) * cos(2*pi*f_sc*t).
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
blk = bpsk_subcarrier_mod ( subcarrier_freq = 1_024_000 , sample_rate = 5_120_000 )
Port Direction Type Description in0Input floatNRZ waveform from nrz_encoder out0Output floatBPSK modulated subcarrier
Parameter Type Default Description subcarrier_freqfloat1024000Subcarrier frequency in Hz (PCM_SUBCARRIER_HZ) sample_ratefloat5120000Sample rate in Hz
Property Type Description subcarrier_freqfloatSubcarrier frequency in Hz sample_ratefloatSample rate in Hz
input (NRZ) -> (mixer, port 0); sig_source_f(cos, subcarrier_freq) -> (mixer, port 1); mixer -> output
Module: apollo.fm_voice_subcarrier_mod
Type: gr.hier_block2
Purpose: FM-modulate audio onto 1.25 MHz subcarrier with +/-29 kHz deviation. Two modes: internal test tone (source block) or external audio input.
# Internal test tone (no input)
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
voice = fm_voice_subcarrier_mod ( tone_freq = 1000.0 )
voice = fm_voice_subcarrier_mod ( audio_input = True )
Caution
The I/O signature changes based on the audio_input parameter. When audio_input=False (default), the block is a source with no streaming input. When audio_input=True, it accepts one float input for the external audio stream.
Port Direction Type Description in0Input float (or none)External audio signal when audio_input=True; no input when False out0Output floatFM subcarrier at subcarrier_freq
Parameter Type Default Description sample_ratefloat5120000Sample rate in Hz subcarrier_freqfloat1250000Voice subcarrier center frequency (VOICE_SUBCARRIER_HZ) fm_deviationfloat29000FM deviation in Hz (VOICE_FM_DEVIATION_HZ) tone_freqfloat1000.0Internal test tone frequency when audio_input=False audio_inputboolFalseWhen True, accepts external float audio stream
Property Type Description tone_freqfloatTest tone frequency in Hz subcarrier_freqfloatSubcarrier center frequency in Hz fm_deviationfloatFM deviation in Hz audio_inputboolWhether block accepts external audio
[audio source or input] -> frequency_modulator_fc(sensitivity) -> (mixer, 0); sig_source_c(subcarrier_freq) -> (mixer, 1); mixer -> complex_to_real -> output
Module: apollo.pm_mod
Type: gr.hier_block2
Purpose: Apply phase modulation at 0.133 rad peak deviation, producing complex baseband exp(j * phi(t)).
from apollo.pm_mod import pm_mod
blk = pm_mod ( pm_deviation = 0.133 , sample_rate = 5_120_000 )
Port Direction Type Description in0Input floatComposite modulating signal (sum of subcarriers) out0Output complexPM complex baseband
Parameter Type Default Description pm_deviationfloat0.133Peak PM deviation in radians (PM_PEAK_DEVIATION_RAD) sample_ratefloat5120000Sample rate in Hz
Method Signature Description get_pm_deviation() -> floatRead current PM deviation set_pm_deviation(dev: float) -> NoneUpdate PM deviation at runtime
input -> multiply_const_ff(pm_deviation) -> phase_modulator_fc(1.0) -> output
Module: apollo.sco_mod
Type: gr.hier_block2
Purpose: Modulate 0-5V sensor voltage onto an FM subcarrier oscillator tone. Only used in FM downlink mode. 9 channels available (14.5 kHz to 165 kHz).
from apollo.sco_mod import sco_mod
blk = sco_mod ( sco_number = 5 , sample_rate = 5_120_000 )
Port Direction Type Description in0Input floatSensor voltage (0.0 to 5.0 V) out0Output floatFM subcarrier tone
Parameter Type Default Description sco_numberint1SCO channel number (1-9). Raises ValueError if invalid sample_ratefloat5120000Sample rate in Hz
Property Type Description center_freqfloatCenter frequency of this SCO channel in Hz deviation_hzfloatFM deviation in Hz (+/- 7.5% of center) sco_numberintSCO channel number (1-9)
input -> add_const_ff(-2.5) -> multiply_const_ff(1/2.5) -> frequency_modulator_fc -> (mixer, 0); sig_source_c(center_freq) -> (mixer, 1); mixer -> complex_to_real -> output
SCO Number Center Frequency Deviation (+/-) Bandwidth (15%) 1 14,500 Hz 1,087.5 Hz 2,175 Hz 2 22,000 Hz 1,650 Hz 3,300 Hz 3 30,000 Hz 2,250 Hz 4,500 Hz 4 40,000 Hz 3,000 Hz 6,000 Hz 5 52,500 Hz 3,937.5 Hz 7,875 Hz 6 70,000 Hz 5,250 Hz 10,500 Hz 7 95,000 Hz 7,125 Hz 14,250 Hz 8 125,000 Hz 9,375 Hz 18,750 Hz 9 165,000 Hz 12,375 Hz 24,750 Hz
Module: apollo.fm_mod
Type: gr.hier_block2
Purpose: Apply frequency modulation to produce complex baseband. Used in FM downlink mode.
from apollo.fm_mod import fm_mod
blk = fm_mod ( fm_deviation_hz = 500_000 , sample_rate = 5_120_000 )
Port Direction Type Description in0Input floatComposite modulating signal (sum of SCO subcarriers) out0Output complexFM complex baseband
Parameter Type Default Description fm_deviation_hzfloat500000Max frequency deviation in Hz (FM_CARRIER_DEVIATION_HZ) sample_ratefloat5120000Sample rate in Hz
Method Signature Description get_fm_deviation() -> floatRead current FM deviation set_fm_deviation(hz: float) -> NoneUpdate FM deviation at runtime
input -> multiply_const_ff(1.0) -> frequency_modulator_fc(sensitivity) -> output
where sensitivity = 2*pi*fm_deviation_hz/sample_rate.
Module: apollo.usb_signal_source
Type: gr.hier_block2
Purpose: Complete Apollo USB transmit chain in one block — the TX counterpart to usb_downlink_receiver.
from apollo.usb_signal_source import usb_signal_source
blk = usb_signal_source ( voice_enabled = True , snr_db = 20.0 )
Port Direction Type Description (none) Input (none) Source block — no streaming input out0Output complexPM-modulated complex baseband at sample_rate "frame_data"Input Message Forwarded to pcm_frame_source for dynamic payload
Parameter Type Default Description sample_ratefloat5120000Baseband sample rate in Hz bit_rateint51200PCM bit rate in bps pm_deviationfloat0.133PM peak deviation in radians voice_enabledboolFalseInclude 1.25 MHz FM voice subcarrier voice_tone_hzfloat1000.0Voice test tone frequency when voice_enabled=True snr_dbfloat | NoneNoneAdd AWGN at this SNR. None means no noise
pcm_frame_source -> nrz_encoder -> bpsk_subcarrier_mod -+-> add_ff -> pm_mod -> [+AWGN] -> output
fm_voice_subcarrier_mod (x0.764) --+
(only when voice_enabled=True)
Voice scale factor: 1.68 / 2.2 = 0.764 (per IMPL_SPEC power ratio — PCM at 2.2 Vpp, voice at 1.68 Vpp).
The internal blocks are exposed as instance attributes for runtime inspection or parameter adjustment:
Attribute Type Block self.frame_srcpcm_frame_sourceFrame generator self.nrznrz_encoderNRZ line encoder self.bpskbpsk_subcarrier_modBPSK modulator self.voicefm_voice_subcarrier_modVoice modulator (when voice_enabled) self.voice_gainmultiply_const_ffVoice level scaling (when voice_enabled) self.adderadd_ffSubcarrier summer (when voice_enabled) self.pmpm_modPhase modulator self.noisenoise_source_cAWGN source (when snr_db is set)
Module: apollo.fm_signal_source
Type: gr.hier_block2
Purpose: Complete Apollo FM downlink transmit chain — generates SCO channels at test voltages, sums them, and applies FM carrier modulation.
from apollo.fm_signal_source import fm_signal_source
blk = fm_signal_source ( channels = [ 1 , 5 , 9 ], test_voltages = { 1 : 1.0 , 5 : 2.5 , 9 : 4.0 } , snr_db = 30 )
Port Direction Type Description (none) Input (none) Source block — no streaming input out0Output complexFM-modulated complex baseband at sample_rate
Parameter Type Default Description channelslist[int][1, 5, 9]SCO channel numbers to generate (1-9) test_voltagesdict[int, float]{ch: 2.5 for each}DC voltage per channel (0.0-5.0 V) sample_ratefloat5120000Baseband sample rate in Hz fm_deviation_hzfloat500000Carrier FM deviation in Hz snr_dbfloat | NoneNoneAdd AWGN at this SNR. None = no noise
Property Type Description channelslist[int]SCO channel numbers being generated test_voltagesdict[int, float]Current voltage per channel fm_deviation_hzfloatCarrier FM deviation in Hz
dc_source(v1) -> sco_mod(ch1) -+-> add_ff -> fm_mod -> [+AWGN] -> output
dc_source(v2) -> sco_mod(ch2) -+
dc_source(vN) -> sco_mod(chN) -+
Module: apollo.fm_downlink_receiver
Type: gr.hier_block2
Purpose: Complete Apollo FM downlink receiver — complex baseband input to recovered SCO voltages on streaming float outputs.
from apollo.fm_downlink_receiver import fm_downlink_receiver
blk = fm_downlink_receiver ( channels = [ 1 , 5 , 9 ] )
Port Direction Type Description in0Input complex (streaming)Baseband IQ samples at sample_rate out0..N-1Output float (streaming)Recovered 0-5V sensor voltage per SCO channel
Output port ordering matches the channels list: output 0 = channels[0], etc.
Parameter Type Default Description channelslist[int][1, 5, 9]SCO channel numbers to decode (1-9) sample_ratefloat5120000Input sample rate in Hz carrier_pll_bwfloat0.02FM carrier recovery loop bandwidth (rad/sample) fm_deviation_hzfloat500000Expected carrier FM deviation in Hz
Property Type Description channelslist[int]SCO channel numbers being decoded
Method Signature Description get_sco_demod(channel: int) -> sco_demodAccess a specific SCO demodulator for runtime inspection
complex in -> fm_demod -> sco_demod(ch1) -> output[0]
-> sco_demod(ch2) -> output[1]
-> sco_demod(chN) -> output[N-1]
The uplink carries ground-station commands to the spacecraft as 15-bit AGC words at 2 kbps NRZ on a 70 kHz FM subcarrier, phase-modulated onto the 2106.40625 MHz uplink carrier at 1.0 rad peak deviation. Each word triggers an UPRUPT interrupt in the AGC flight software.
Module: apollo.uplink_word_codec
Type: Pure-Python class
Purpose: Serialize (channel, value) pairs into a continuous NRZ bit stream with configurable inter-word gap.
from apollo.uplink_word_codec import UplinkSerializerEngine
engine = UplinkSerializerEngine ( inter_word_gap = 3 )
engine. add_words ( [ ( 37 , 0x 4400 ), ( 37 , 0x 0400 ) ] )
bits = engine. next_bits ( 36 )
Parameter Type Default Description inter_word_gapint3Number of zero-bit periods between consecutive words (UPLINK_INTER_WORD_GAP)
Method Signature Description add_words(pairs: list[tuple[int, int]]) -> NoneQueue (channel, value) pairs for serialization. Each value is serialized as 15 bits MSB-first, followed by inter-word gap zeros. The channel is not transmitted — it is used only for metadata/logging next_bits(n: int) -> list[int]Pull exactly n bits from the queue. Returns queued data bits when available, zeros otherwise (idle carrier)
Property Type Description pendingintNumber of bits remaining in the queue
Module: apollo.uplink_word_codec
Type: Pure-Python class
Purpose: Reassemble 15-bit AGC words from a recovered NRZ bit stream using a two-phase state machine (acquisition and locked framing).
from apollo.uplink_word_codec import UplinkDeserializerEngine
engine = UplinkDeserializerEngine ( inter_word_gap = 3 )
pairs = engine. process_bits ( [ 1 , 0 , 1 , 1 , ... ] )
# Returns: [(37, 0x4400), ...]
Parameter Type Default Description inter_word_gapint3Expected number of zero bits between words (UPLINK_INTER_WORD_GAP) channelint37AGC channel to assign to recovered words (AGC_CH_INLINK, 045 octal)
Method Signature Description process_bits(bits: list[int]) -> list[tuple[int, int]]Process a batch of recovered bits. Returns list of (channel, value) tuples for each completed word reset() -> NoneClear internal state for a fresh decode pass
The deserializer uses a two-phase approach:
Acquisition — Scans for the first non-zero bit (all DSKY keycodes have at least one set bit in the upper 5 bits, so the first transmitted word always starts with a 1).
Locked — Uses fixed framing: collects exactly 15 bits per word, skips exactly inter_word_gap bits, then collects the next 15, etc. The lock releases after seeing a null word (all zeros), returning to acquisition.
Module: apollo.uplink_word_codec
Type: gr.sync_block
Purpose: GNU Radio source block wrapping UplinkSerializerEngine. Accepts word PDUs on message input, outputs continuous NRZ byte stream.
from apollo.uplink_word_codec import uplink_word_serializer
blk = uplink_word_serializer ( inter_word_gap = 3 )
Port Direction Type Description (none) Input (none) Source block — no streaming input out0Output byteNRZ bit stream (values 0 or 1). Zeros when idle "words"Input Message (PDU) Word injection: PDU with metadata dict containing "channel" and "value" keys, or a pair (channel . value)
Parameter Type Default Description inter_word_gapint3Number of zero-bit periods between words (UPLINK_INTER_WORD_GAP)
The "words" message port accepts the same format emitted by uplink_encoder:
Key PMT Type Description "channel"pmt.from_longAGC channel number (37 for INLINK) "value"pmt.from_long15-bit word value
Module: apollo.uplink_word_codec
Type: gr.basic_block
Purpose: GNU Radio block wrapping UplinkDeserializerEngine. Consumes byte stream, emits PDU messages for each recovered 15-bit word.
from apollo.uplink_word_codec import uplink_word_deserializer
blk = uplink_word_deserializer ( inter_word_gap = 3 , channel = 37 )
Port Direction Type Description in0Input byte (streaming)NRZ bit stream (values 0 or 1) from binary slicer "commands"Output Message (PDU) Recovered word PDUs
Parameter Type Default Description inter_word_gapint3Expected number of zero bits between words (UPLINK_INTER_WORD_GAP) channelint37AGC channel to assign to recovered words (AGC_CH_INLINK)
Each output PDU is pmt.cons(metadata, pmt.PMT_NIL):
Metadata dict:
Key PMT Type Description "channel"pmt.from_longAGC channel number (37) "value"pmt.from_longRecovered 15-bit word value
Module: apollo.usb_uplink_source
Type: gr.hier_block2
Purpose: Complete Apollo USB uplink transmit chain — serializes 15-bit AGC words into NRZ bits, FM-modulates onto 70 kHz subcarrier, and applies PM modulation to produce complex baseband.
from apollo.usb_uplink_source import usb_uplink_source
blk = usb_uplink_source ( sample_rate = 5_120_000 , snr_db = 20.0 )
Port Direction Type Description (none) Input (none) Source block — no streaming input out0Output complexPM-modulated complex baseband at sample_rate "words"Input Message Forwarded to uplink_word_serializer for command injection. Accepts the same PDU format emitted by uplink_encoder
Parameter Type Default Description sample_ratefloat5120000Baseband sample rate in Hz (SAMPLE_RATE_BASEBAND) bit_rateint2000Uplink data bit rate in bps (UPLINK_DATA_BIT_RATE) pm_deviationfloat1.0Peak PM deviation in radians (UPLINK_PM_DEVIATION_RAD) snr_dbfloat | NoneNoneAdd AWGN at this SNR. None means no noise
word_ser -> nrz_encoder(2 kbps) -> fm_mod(sensitivity)
-> (mixer, 0); sig_source_c(70 kHz) -> (mixer, 1)
-> mixer -> complex_to_real -> pm_mod(1.0 rad) -> [+AWGN] -> output
Attribute Type Block self.word_seruplink_word_serializerWord serializer self.nrznrz_encoderNRZ line encoder self.fm_modfrequency_modulator_fcFM modulator self.losig_source_c70 kHz LO self.mixermultiply_ccSubcarrier upconverter self.to_realcomplex_to_realComplex-to-real conversion self.pmpm_modPhase modulator self.noisenoise_source_cAWGN source (when snr_db is set)
Module: apollo.usb_uplink_receiver
Type: gr.hier_block2
Purpose: Complete Apollo USB uplink receiver chain — complex baseband input to recovered command PDUs. The spacecraft-side counterpart to usb_uplink_source.
from apollo.usb_uplink_receiver import usb_uplink_receiver
blk = usb_uplink_receiver ( sample_rate = 5_120_000 )
Port Direction Type Description in0Input complex (streaming)Baseband IQ samples at sample_rate "commands"Output Message (PDU) Decoded (channel, value) PDUs for AGC bridge
Parameter Type Default Description sample_ratefloat5120000Input sample rate in Hz (SAMPLE_RATE_BASEBAND) bit_rateint2000Uplink data bit rate in bps (UPLINK_DATA_BIT_RATE) carrier_pll_bwfloat0.02PM demodulator PLL bandwidth (rad/sample) subcarrier_bwfloat2000070 kHz subcarrier bandpass filter width (Hz)
complex in -> pm_demod -> subcarrier_extract(70 kHz, BW=20 kHz)
-> quadrature_demod_cf(FM) -> matched_filter(1/samp_per_bit)
-> keep_one_in_n(samp_per_bit) -> binary_slicer_fb
-> uplink_word_deserializer -> "commands" message output
Attribute Type Block self.pmpm_demodPM demodulator self.sc_extractsubcarrier_extract70 kHz subcarrier extractor self.fm_demodquadrature_demod_cfFM discriminator self.matched_filterfir_filter_fffIntegrate-and-dump matched filter self.decimatorkeep_one_in_nBit-rate decimator self.slicerbinary_slicer_fbHard-decision binary slicer self.deseruplink_word_deserializerWord reassembler
The Apollo ranging system measures spacecraft distance by transmitting a composite pseudo-random noise (PRN) code and correlating the echo. The code combines five component sequences (CL, X, A, B, C) using majority-vote logic and XOR operations. Combined code length: 5,456,682 chips (~5.49 seconds at 993,963 chips/sec).
Module: apollo.ranging
Type: Pure-Python class
Purpose: Generate Apollo PRN ranging code sequences — individual component codes or the full composite sequence.
from apollo.ranging import RangingCodeGenerator
gen = RangingCodeGenerator ()
composite = gen. generate_sequence () # Full 5.4M chip code
a_code = gen. generate_a () # 31-chip A component
x_code = gen. generate_component ( " x " ) # By name
Method Signature Description generate_cl(n_chips: int | None) -> np.ndarrayGenerate CL component: alternating 0, 1 clock. Default length: 2 generate_x(n_chips: int | None) -> np.ndarrayGenerate X component: 11-chip non-LFSR sequence with custom feedback logic generate_a(n_chips: int | None) -> np.ndarrayGenerate A component: 31-chip LFSR, 5 bits, taps [2,0] (x^5+x^2+1) generate_b(n_chips: int | None) -> np.ndarrayGenerate B component: 63-chip LFSR, 6 bits, taps [1,0] (x^6+x+1) generate_c(n_chips: int | None) -> np.ndarrayGenerate C component: 127-chip LFSR, 7 bits, taps [1,0] (x^7+x+1) generate_component(name: str, n_chips: int | None) -> np.ndarrayGenerate a named component ("cl", "x", "a", "b", "c"). Raises ValueError if name is not recognized generate_sequence(n_chips: int | None) -> np.ndarrayGenerate the full composite PRN code. Default: one full period (5,456,682 chips)
All methods return uint8 numpy arrays of 0/1 values. When n_chips is None, generates one full period for that component.
Component Length Register Taps Init Type CL 2 — — — Alternating clock X 11 5-bit Custom feedback 0b10110 (22)Non-LFSR A 31 5-bit [2, 0] 0x1F (all ones)Maximal-length LFSR B 63 6-bit [1, 0] 0x3F (all ones)Maximal-length LFSR C 127 7-bit [1, 0] 0x7F (all ones)Maximal-length LFSR
The composite code reproduces Shirriff’s calc() algorithm: on even output chips (ck=0), all shift registers advance and the output is computed from feedback bits via majority logic. On odd chips (ck=1), the output is flipped.
Combination logic per step: out = (NOT(xnew) AND maj(anew, bnew, cnew)) XOR ck
where maj(A,B,C) = (A&B) | (A&C) | (B&C).
Module: apollo.ranging
Type: Pure-Python class
Purpose: Cross-correlate received NRZ samples with the known PRN code using FFT-based correlation for range measurement.
from apollo.ranging import RangingCorrelator
correlator = RangingCorrelator ( sample_rate = 5_120_000 , two_way = True )
result = correlator. correlate ( received_samples )
print ( f "Range: {result [ ' range_m ' ] :.1f } m" )
Parameter Type Default Description chip_rateint993963PRN chip rate in chips/sec (RANGING_CHIP_RATE_HZ) sample_ratefloat993963Input sample rate in Hz. When equal to chip_rate, each chip is one sample two_wayboolTrueIf True, range is halved (signal traveled ground to spacecraft and back)
Method Signature Description correlate(received: np.ndarray, code_chips: int | None) -> dictCross-correlate received float samples with PRN reference. Returns measurement dict
Property Type Description chip_rateintPRN chip rate in chips/sec sample_ratefloatInput sample rate in Hz two_wayboolTwo-way range mode samples_per_chipfloatRatio sample_rate / chip_rate
Key Type Description delay_samplesintPeak correlation index in samples delay_chipsfloatDelay measured in chip periods range_mfloatComputed range in meters (halved if two_way=True) correlation_peakfloatAbsolute value of the correlation peak peak_to_avg_ratiofloatPeak divided by mean correlation magnitude. Higher values indicate cleaner detection
Module: apollo.ranging
Type: Pure-Python function
Purpose: Convert chip delay to range in meters.
from apollo.ranging import chips_to_range_m
distance = chips_to_range_m ( delay_chips = 100.0 , two_way = True )
def chips_to_range_m ( delay_chips : float , two_way : bool = True ) -> float
Parameter Type Default Description delay_chipsfloat— Delay measured in chip periods two_wayboolTrueIf True, divide distance by 2 (round-trip signal path)
Range in meters (float). Uses SPEED_OF_LIGHT_M_S (299,792,458 m/s) and RANGING_CHIP_RATE_HZ (993,963 chip/s).
Module: apollo.ranging
Type: Pure-Python function
Purpose: Convert range in meters to chip delay.
from apollo.ranging import range_m_to_chips
chips = range_m_to_chips ( range_m = 384_400_000 , two_way = True )
def range_m_to_chips ( range_m : float , two_way : bool = True ) -> float
Parameter Type Default Description range_mfloat— Distance in meters two_wayboolTrueIf True, compute round-trip delay (distance is doubled before conversion)
Delay in chip periods (float).
Module: apollo.ranging
Type: Pure-Python function
Purpose: Self-test for code correctness — verify all component codes have correct length, periodicity, and balance properties.
from apollo.ranging import verify_code_properties
results = verify_code_properties ()
for name, props in results. items ():
print ( f " {name} : {props} " )
def verify_code_properties () -> dict
None.
Dict with component names as keys ("cl", "x", "a", "b", "c", "length_product", "composite_sample") and property dicts as values.
Per-component dict:
Key Type Description lengthintActual generated length length_correctboolWhether length matches the expected constant ones_countintNumber of 1-chips in the sequence zeros_countintNumber of 0-chips in the sequence periodicboolWhether 2x generation repeats exactly balance_correctboolWhether ones/zeros ratio matches LFSR theory (for A, B, C, CL)
length_product dict:
Key Type Description expectedintProduct of all component lengths (2 x 11 x 31 x 63 x 127 = 5,456,682) matches_constantboolWhether RANGING_CODE_LENGTH equals the product
composite_sample dict:
Key Type Description lengthintLength of the test sample (10,000 chips) ones_countintNumber of 1-chips zeros_countintNumber of 0-chips balancefloatRatio of ones to total (near 0.5 for a balanced code)
Module: apollo.ranging_source
Type: gr.sync_block
Purpose: GNU Radio source block producing a continuous PRN ranging chip stream. Pre-generates the full code period and cycles through it.
from apollo.ranging_source import ranging_source
Port Direction Type Description (none) Input (none) Source block — no streaming input out0Output bytePRN chip stream (values 0 or 1), repeating every 5,456,682 chips
None. The code period and chip rate are determined by the ranging constants.
Module: apollo.ranging_mod
Type: gr.hier_block2
Purpose: NRZ-encode PRN chips at the baseband sample rate. Converts chip stream (bytes 0/1) to float NRZ waveform (+1/-1) suitable for summing with other subcarriers before PM modulation.
from apollo.ranging_mod import ranging_mod
blk = ranging_mod ( chip_rate = 993_963 , sample_rate = 5_120_000 )
Port Direction Type Description in0Input byteChip stream from ranging_source (values 0 or 1) out0Output floatNRZ waveform (+1.0 / -1.0) at sample_rate
Parameter Type Default Description chip_rateint993963PRN chip rate in chips/sec (RANGING_CHIP_RATE_HZ) sample_ratefloat5120000Output sample rate in Hz (SAMPLE_RATE_BASEBAND)
Property Type Description samples_per_chipintSamples per chip period: int(sample_rate / chip_rate)
input -> char_to_float -> multiply_const_ff(2.0) -> add_const_ff(-1.0) -> repeat(samples_per_chip) -> output
Note
At 5.12 MHz sample rate with 993,963 chip/s, samples_per_chip is 5 (integer truncation of 5.1509…). This is functionally identical to nrz_encoder but configured for the ranging chip rate instead of the PCM bit rate.
Module: apollo.ranging_demod
Type: gr.basic_block
Purpose: FFT-based ranging demodulator. Accumulates samples in batches, cross-correlates with the known PRN code, and emits range measurement PDUs.
from apollo.ranging_demod import ranging_demod
blk = ranging_demod ( sample_rate = 5_120_000 , correlation_length = 100_000 )
Port Direction Type Description in0Input float (streaming)PM demod output or filtered ranging signal "range"Output Message (PDU) Range measurement PDUs, one per correlation batch
Parameter Type Default Description chip_rateint993963PRN chip rate in chips/sec (RANGING_CHIP_RATE_HZ) sample_ratefloat5120000Input sample rate in Hz (SAMPLE_RATE_BASEBAND) correlation_lengthint100000Number of samples to accumulate per correlation batch. Longer batches improve SNR at the cost of measurement rate two_wayboolTrueIf True, range is halved (round-trip signal path)
Each PDU is pmt.cons(metadata, pmt.PMT_NIL):
Metadata dict:
Key PMT Type Description "delay_chips"pmt.from_doubleMeasured delay in chip periods "range_m"pmt.from_doubleComputed range in meters "correlation_peak"pmt.from_doubleAbsolute value of the correlation peak "peak_to_avg_ratio"pmt.from_doublePeak-to-average ratio (detection quality metric) "timestamp"pmt.from_doubletime.time() when the measurement was emitted