Skip to content

Connect to Virtual AGC

The Virtual AGC project provides a cycle-accurate emulator of the Apollo Guidance Computer (yaAGC). gr-apollo can connect to it over TCP, feeding decoded downlink telemetry into the emulator and receiving uplink commands. This guide covers both the standalone Python client and the GNU Radio block.

  1. Install Virtual AGC. Build from source or use a pre-built binary from the Virtual AGC project. The key binary is yaAGC.

  2. Start the AGC emulator. Launch yaAGC with a mission rope (flight software image):

    Terminal window
    yaAGC --port=19697 Luminary099.bin

    The emulator listens on TCP port 19697 by default.

  3. Install gr-apollo.

    Terminal window
    uv pip install -e .

    The AGCBridgeClient and UplinkEncoder are pure Python — they work without GNU Radio installed.

yaAGC communicates using 4-byte packets over TCP. Each packet encodes an I/O channel number (9 bits) and a data value (15 bits):

Byte 0: [Channel bits 8-4][0x00 signature]
Byte 1: [0x40 | Channel bits 3-1][Value bits 14-12]
Byte 2: [0x80 | Value bits 11-6]
Byte 3: [0xC0 | Value bits 5-0]

The telecom-relevant channels are:

ChannelOctalDirectionPurpose
37045Uplink (in)INLINK — ground commands to AGC
47057Downlink (out)OUTLINK — digital downlink data
28034Downlink (out)DNTM1 — telemetry word high byte
29035Downlink (out)DNTM2 — telemetry word low byte

The AGCBridgeClient connects to yaAGC in a background thread, auto-reconnects on disconnection, and delivers received packets through callbacks.

from apollo.agc_bridge import AGCBridgeClient
def on_packet(channel, value):
print(f"Received: ch={channel} ({channel:03o}o) value={value:05o}o ({value})")
def on_status(state):
print(f"Connection: {state}")
client = AGCBridgeClient(
host="localhost",
port=19697,
on_packet=on_packet,
on_status=on_status,
)
client.start()

The client runs a daemon thread that:

  • Connects to yaAGC at the specified host and port
  • Reads 4-byte packets continuously
  • Filters for telecom channels (034, 035, 045, 057) by default
  • Calls on_packet(channel, value) for each matching packet
  • Auto-reconnects with exponential backoff (0.5s to 30s) on disconnection
print(client.state) # "connected", "connecting", or "disconnected"
print(client.connected) # True / False

Send raw (channel, value) pairs directly:

# Send a value on channel 045 (INLINK)
client.send(channel=0o45, value=0o12345)

The send() method returns True if the packet was sent, False if not connected.

By default, only telecom channels (034, 035, 045, 057) pass through. To receive all AGC I/O:

client = AGCBridgeClient(
host="localhost",
port=19697,
channel_filter=None, # Accept all channels
on_packet=on_packet,
)
client.stop() # Signals the rx thread and waits up to 5 seconds

The UplinkEncoder translates DSKY keystrokes and VERB-NOUN commands into the (channel, value) pairs that the AGC expects on channel 045.

from apollo.agc_bridge import AGCBridgeClient
from apollo.uplink_encoder import UplinkEncoder
client = AGCBridgeClient(host="localhost", port=19697)
client.start()
enc = UplinkEncoder()
# V37N00 -- select idle program (P00)
commands = enc.encode_verb_noun(37, 0)
# Returns: [(37, verb_key), (37, digit_3), (37, digit_7),
# (37, noun_key), (37, digit_0), (37, digit_0),
# (37, enter_key)]
for channel, value in commands:
client.send(channel, value)
enc = UplinkEncoder()
# Individual DSKY keys
enc.encode_verb(37) # [VERB, 3, 7]
enc.encode_noun(1) # [NOUN, 0, 1]
enc.encode_data(12345) # [+, 1, 2, 3, 4, 5]
enc.encode_data(-500) # [-, 0, 0, 5, 0, 0]
enc.encode_proceed() # [ENTER]
# Full VERB-NOUN-ENTER sequence
enc.encode_verb_noun(16, 65) # V16N65 ENTER
# Generic dispatcher
enc.encode_command("VERB", 37)
enc.encode_command("NOUN", 0)
enc.encode_command("DATA", 12345)
enc.encode_command("PROCEED")

Each method returns a list of (channel, value) tuples. The channel defaults to 037 (octal 045, INLINK).

The encoder uses the same 5-bit key codes as the real AGC:

KeyOctal CodeDecimal
VERB02117
NOUN03731
ENTER / PROCEED03428
RESET / KEY RELEASE02218
CLEAR03630
+03226
-03327
002016
1-7001-0071-7
80108
90119

For use in GRC flowgraphs, the agc_bridge block wraps the TCP client and exposes message ports:

flowchart LR
    subgraph agc_bridge
        direction TB
        UP["uplink_data (input)"]
        DN["downlink_data (output)"]
        ST["status (output)"]
    end
    GR_UPLINK["Uplink PDUs"] --> UP
    DN --> GR_DOWNLINK["Downlink PDUs"]
    ST --> GR_STATUS["Status Messages"]
    UP -.->|TCP| AGC["yaAGC :19697"]
    AGC -.->|TCP| DN
from gnuradio import blocks, gr
from apollo.agc_bridge import agc_bridge
tb = gr.top_block()
bridge = agc_bridge(host="localhost", port=19697)
debug = blocks.message_debug()
# Monitor downlink data and status
tb.msg_connect(bridge, "downlink_data", debug, "print")
tb.msg_connect(bridge, "status", debug, "print")
tb.start()
# ... bridge runs until stopped
tb.stop()
tb.wait()

A complete pipeline: USB baseband signal decoded to PCM frames, AGC telemetry extracted and forwarded to yaAGC:

from gnuradio import blocks, gr
from apollo.agc_bridge import agc_bridge
from apollo.constants import SAMPLE_RATE_BASEBAND
from apollo.downlink_decoder import downlink_decoder
from apollo.usb_downlink_receiver import usb_downlink_receiver
tb = gr.top_block()
src = blocks.file_source(gr.sizeof_gr_complex, "recording.cf32", repeat=False)
receiver = usb_downlink_receiver(output_format="raw")
decoder = downlink_decoder()
bridge = agc_bridge(host="localhost", port=19697)
tb.connect(src, receiver)
# Route AGC channel data through the downlink decoder
tb.msg_connect(receiver, "agc_data", decoder, "agc_data")
# Forward decoded downlink lists (for monitoring)
debug = blocks.message_debug()
tb.msg_connect(decoder, "downlink", debug, "print")
tb.run()

The AGCBridgeClient (used by both the standalone client and the GR block) handles connection failures automatically:

EventBehavior
Initial connection failsRetry with exponential backoff (0.5s, 1s, 2s, … up to 30s)
Connection lost mid-sessionClose socket, reset to DISCONNECTED, begin retry loop
yaAGC restartedClient reconnects within the backoff window
stop() calledStop event signals the rx thread, socket is closed, thread joins within 5s
Callback raises exceptionException is logged, processing continues

The backoff resets to 0.5 seconds after each successful connection.

The AGC sends telemetry on channels 034 and 035 as pairs of bytes that together form 15-bit AGC words. The DownlinkEngine reassembles these:

from apollo.agc_bridge import AGCBridgeClient
from apollo.downlink_decoder import DownlinkEngine
engine = DownlinkEngine()
def on_packet(channel, value):
snapshot = engine.feed_agc_word(channel, value)
if snapshot is not None:
print(f"Downlink list: {snapshot['list_name']}")
print(f" Words: {snapshot['word_count']}")
print(f" First word: {snapshot['words'][0]:05o}o")
client = AGCBridgeClient(
host="localhost",
port=19697,
on_packet=on_packet,
)
client.start()

The engine buffers 400 words (one complete downlink snapshot) before emitting a decoded list. See the PCM Telemetry guide for details on interpreting the word contents.