Project

General

Profile

Security #8584 » dns_forward_pointer_query.py

Jason Ish, 05/19/2026 08:42 PM

 
#!/usr/bin/env python3
"""
Send a DNS query whose QNAME uses a forward compression pointer.

Default layout:

offset 12: c0 12 QNAME: pointer to offset 18
offset 14: 00 01 00 01 QTYPE=A, QCLASS=IN
offset 18: 03 'www' ... 00 real target name, outside the question

The --layout second-question mode instead sets QDCOUNT=2 and makes the first
question's QNAME point forward to the second question's QNAME.

These packets are intentionally malformed/non-standard, but useful for testing
whether a resolver's DNS name parser accepts forward compression pointers.
"""

import argparse
import random
import socket
import struct
import sys

from scapy.all import DNS, IP, IPv6, Raw, UDP, hexdump, sr1 # type: ignore

QTYPES = {
"A": 1,
"NS": 2,
"CNAME": 5,
"SOA": 6,
"PTR": 12,
"MX": 15,
"TXT": 16,
"AAAA": 28,
"SRV": 33,
"HTTPS": 65,
"ANY": 255,
}


def encode_name(name: str) -> bytes:
name = name.rstrip(".")
if not name:
return b"\x00"
out = bytearray()
for label in name.split("."):
raw = label.encode("ascii")
if len(raw) > 63:
raise ValueError(f"label too long: {label!r}")
out.append(len(raw))
out.extend(raw)
out.append(0)
return bytes(out)


def parse_qtype(value: str) -> int:
upper = value.upper()
if upper in QTYPES:
return QTYPES[upper]
return int(value, 0)


def make_header(dns_id: int, rd: bool, qdcount: int = 1) -> bytes:
flags = 0x0100 if rd else 0x0000
return struct.pack("!HHHHHH", dns_id, flags, qdcount, 0, 0, 0)


def make_normal_query(name: str, qtype: int, qclass: int, dns_id: int, rd: bool) -> bytes:
return make_header(dns_id, rd) + encode_name(name) + struct.pack("!HH", qtype, qclass)


def make_forward_ptr_query(
name: str, qtype: int, qclass: int, dns_id: int, rd: bool, layout: str
) -> bytes:
header = make_header(dns_id, rd)

if layout == "whole":
# QNAME is only a pointer. The pointed-to name starts after QTYPE/QCLASS.
target_offset = 12 + 2 + 4
if target_offset > 0x3FFF:
raise ValueError("target offset too large for DNS compression pointer")
qname = struct.pack("!H", 0xC000 | target_offset)
return header + qname + struct.pack("!HH", qtype, qclass) + encode_name(name)

if layout == "suffix":
labels = name.rstrip(".").split(".")
if len(labels) < 2:
raise ValueError("suffix layout requires a name with at least two labels")
first = labels[0].encode("ascii")
if len(first) > 63:
raise ValueError(f"label too long: {labels[0]!r}")
suffix = ".".join(labels[1:])
prefix = bytes([len(first)]) + first
target_offset = 12 + len(prefix) + 2 + 4
if target_offset > 0x3FFF:
raise ValueError("target offset too large for DNS compression pointer")
qname = prefix + struct.pack("!H", 0xC000 | target_offset)
return header + qname + struct.pack("!HH", qtype, qclass) + encode_name(suffix)

if layout == "second-question":
# QDCOUNT=2. First question's QNAME is a forward pointer to the
# second question's QNAME, which starts immediately after the first
# question's QTYPE/QCLASS.
header = make_header(dns_id, rd, qdcount=2)
target_offset = 12 + 2 + 4
if target_offset > 0x3FFF:
raise ValueError("target offset too large for DNS compression pointer")
q1 = struct.pack("!H", 0xC000 | target_offset) + struct.pack("!HH", qtype, qclass)
q2 = encode_name(name) + struct.pack("!HH", qtype, qclass)
return header + q1 + q2

raise ValueError(f"unknown layout: {layout}")


def send_socket(payload: bytes, server: str, port: int, timeout: float, use_tcp: bool) -> bytes | None:
family = socket.AF_INET6 if ":" in server else socket.AF_INET
socktype = socket.SOCK_STREAM if use_tcp else socket.SOCK_DGRAM
with socket.socket(family, socktype) as s:
s.settimeout(timeout)
if use_tcp:
s.connect((server, port))
s.sendall(struct.pack("!H", len(payload)) + payload)
hdr = s.recv(2)
if len(hdr) != 2:
return None
length = struct.unpack("!H", hdr)[0]
data = bytearray()
while len(data) < length:
chunk = s.recv(length - len(data))
if not chunk:
break
data.extend(chunk)
return bytes(data)
s.sendto(payload, (server, port))
return s.recvfrom(65535)[0]


def send_scapy_l3(payload: bytes, server: str, port: int, timeout: float) -> bytes | None:
ip = IPv6(dst=server) if ":" in server else IP(dst=server)
pkt = ip / UDP(dport=port, sport=random.randint(1024, 65535)) / Raw(payload)
resp = sr1(pkt, timeout=timeout, verbose=False)
if resp is None or Raw not in resp:
return None
return bytes(resp[Raw].load)


def print_dns_header(data: bytes) -> None:
if len(data) < 12:
print(f"short response: {len(data)} bytes")
return
dns_id, flags, qd, an, ns, ar = struct.unpack("!HHHHHH", data[:12])
print(
f"id=0x{dns_id:04x} flags=0x{flags:04x} "
f"rcode={flags & 0x0f} qd={qd} an={an} ns={ns} ar={ar}"
)


def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("server", help="resolver IP address")
ap.add_argument("name", help="query name, e.g. www.example.com")
ap.add_argument("--port", type=int, default=53)
ap.add_argument("--type", default="A", help="QTYPE name or number (default: A)")
ap.add_argument("--class", dest="qclass", type=int, default=1, help="QCLASS (default: 1/IN)")
ap.add_argument("--id", dest="dns_id", type=lambda x: int(x, 0), default=None)
ap.add_argument("--no-rd", action="store_true", help="clear recursion-desired bit")
ap.add_argument("--timeout", type=float, default=2.0)
ap.add_argument("--tcp", action="store_true", help="send over TCP using DNS length prefix")
ap.add_argument(
"--layout",
choices=("whole", "suffix", "second-question"),
default="whole",
help="forward pointer layout (default: whole)",
)
ap.add_argument(
"--mode",
choices=("socket", "scapy-l3"),
default="socket",
help="socket avoids raw-socket privileges; scapy-l3 sends IP/UDP/Raw with Scapy",
)
ap.add_argument("--also-normal", action="store_true", help="send a normal query first for comparison")
ap.add_argument("--decode", action="store_true", help="try Scapy DNS decode of responses")
args = ap.parse_args()

qtype = parse_qtype(args.type)
dns_id = args.dns_id if args.dns_id is not None else random.randint(0, 65535)
rd = not args.no_rd

tests: list[tuple[str, bytes]] = []
if args.also_normal:
tests.append(("normal", make_normal_query(args.name, qtype, args.qclass, dns_id, rd)))
dns_id = (dns_id + 1) & 0xFFFF
tests.append(
(
f"forward-pointer/{args.layout}",
make_forward_ptr_query(args.name, qtype, args.qclass, dns_id, rd, args.layout),
)
)

for label, payload in tests:
print(f"\n=== {label} query ===")
print(f"sending {len(payload)} DNS bytes to {args.server}:{args.port}")
hexdump(payload)
try:
if args.mode == "scapy-l3":
if args.tcp:
raise ValueError("--tcp is only supported with --mode socket")
resp = send_scapy_l3(payload, args.server, args.port, args.timeout)
else:
resp = send_socket(payload, args.server, args.port, args.timeout, args.tcp)
except socket.timeout:
resp = None
except OSError as e:
print(f"send failed: {e}")
return 2

if resp is None:
print("no response")
continue

print(f"\nresponse: {len(resp)} DNS bytes")
print_dns_header(resp)
hexdump(resp)
if args.decode:
print("\nScapy DNS decode:")
try:
DNS(resp).show()
except Exception as e:
print(f"decode failed: {e}")

return 0


if __name__ == "__main__":
sys.exit(main())
(2-2/2)