#!/usr/bin/env python3 """ Touchstone gossip / split-view checker — independent, dependency-free. Fetches a recorder's checkpoint chain BOTH from touchstone.cv AND from the public Nostr relays it mirrors to, then cross-checks them. It detects: * FORK — two Nostr events for the same checkpoint with different roots (the server published two histories; relays kept both). * CONTRADICTION — what touchstone.cv serves for a checkpoint differs from what it published to Nostr. * HIDDEN — a checkpoint Touchstone published to Nostr but no longer serves. * STALE — a checkpoint past the anchor horizon (default 30 min, --horizon-min) with no OTS submission, when this recorder otherwise anchors via OTS: its head is not on its way to Bitcoin (withheld, or an outage), so it can't win a tie-break and shouldn't be relied on. (Bitcoin *confirmation* legitimately takes hours and is NOT flagged — only a missing submission.) * UNMIRRORED — a served checkpoint with no Nostr event yet (info; may be recent). Also verifies every Nostr event's BIP340 signature against Touchstone's declared key (so a relay can't forge one) and the append-only chain of the served feed. Detection is not resolution: when a fork/contradiction is found, two valid-looking heads exist at the same seq and a reader still needs a tie-breaker. The only ordering the server can't forge is the OTS→Bitcoin commitment, so this tool also RESOLVES the fork — of the conflicting heads, the canonical one is whichever was committed to Bitcoin at the lowest block height. A head with no visible anchor can't claim priority over an anchored one; if no head is anchored yet the fork is flagged UNRESOLVED. Usage: python3 gossip_check.py [--base https://touchstone.cv] python3 gossip_check.py --selftest # offline unit tests, no network Exit: 0 consistent · 1 fork/contradiction/hidden · 2 incomplete (couldn't fetch) Deps: none (Python 3 stdlib only). """ import sys, json, hashlib, socket, ssl, base64, secrets, struct, time import urllib.request from urllib.parse import urlparse # ── BIP340 verification (vendored) ──────────────────────────────────────── p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F n = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 G = (0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8) def tagged_hash(tag, msg): th = hashlib.sha256(tag.encode()).digest() return hashlib.sha256(th + th + msg).digest() def point_add(P1, P2): if P1 is None: return P2 if P2 is None: return P1 if P1[0] == P2[0] and P1[1] != P2[1]: return None if P1 == P2: lam = (3 * P1[0] * P1[0] * pow(2 * P1[1], p - 2, p)) % p else: lam = ((P2[1] - P1[1]) * pow(P2[0] - P1[0], p - 2, p)) % p x3 = (lam * lam - P1[0] - P2[0]) % p return (x3, (lam * (P1[0] - x3) - P1[1]) % p) def point_mul(P, k): R = None for i in range(256): if (k >> i) & 1: R = point_add(R, P) P = point_add(P, P) return R def lift_x(x): if x >= p: return None c = (pow(x, 3, p) + 7) % p y = pow(c, (p + 1) // 4, p) if pow(y, 2, p) != c: return None return (x, y if y % 2 == 0 else p - y) def schnorr_verify(msg32, pubkey32, sig64): if len(pubkey32) != 32 or len(sig64) != 64: return False P = lift_x(int.from_bytes(pubkey32, "big")) if P is None: return False r = int.from_bytes(sig64[0:32], "big"); s = int.from_bytes(sig64[32:64], "big") if r >= p or s >= n: return False e = int.from_bytes(tagged_hash("BIP0340/challenge", sig64[0:32] + pubkey32 + msg32), "big") % n R = point_add(point_mul(G, s), point_mul(P, n - e)) return R is not None and R[1] % 2 == 0 and R[0] == r def event_id_ok(ev): serial = json.dumps([0, ev["pubkey"], ev["created_at"], ev["kind"], ev["tags"], ev["content"]], separators=(",", ":"), ensure_ascii=False) return hashlib.sha256(serial.encode("utf-8")).hexdigest() == ev["id"] def event_sig_ok(ev): try: return event_id_ok(ev) and schnorr_verify(bytes.fromhex(ev["id"]), bytes.fromhex(ev["pubkey"]), bytes.fromhex(ev["sig"])) except Exception: return False # ── fetch helpers ───────────────────────────────────────────────────────── def http_json(url): with urllib.request.urlopen(url, timeout=12) as r: return json.load(r) def relay_query(relay, filt, timeout=8): u = urlparse(relay) s = socket.create_connection((u.hostname, u.port or 443), timeout=timeout) if (u.scheme or "wss") == "wss": s = ssl.create_default_context().wrap_socket(s, server_hostname=u.hostname) key = base64.b64encode(secrets.token_bytes(16)).decode() s.sendall((f"GET {u.path or '/'} HTTP/1.1\r\nHost: {u.hostname}\r\nUpgrade: websocket\r\n" f"Connection: Upgrade\r\nSec-WebSocket-Key: {key}\r\nSec-WebSocket-Version: 13\r\n\r\n").encode()) resp = b""; s.settimeout(timeout) while b"\r\n\r\n" not in resp: c = s.recv(1024) if not c: break resp += c if b" 101 " not in resp.split(b"\r\n", 1)[0]: s.close(); return [] msg = json.dumps(["REQ", "g", filt]).encode() mask = secrets.token_bytes(4); h = bytearray([0x81]) if len(msg) < 126: h.append(0x80 | len(msg)) else: h.append(0x80 | 126); h += struct.pack(">H", len(msg)) h += mask; h += bytes(b ^ mask[i % 4] for i, b in enumerate(msg)); s.sendall(bytes(h)) out, buf, end = [], b"", time.time() + timeout while time.time() < end: try: s.settimeout(max(0.1, end - time.time())); chunk = s.recv(8192) except Exception: break if not chunk: break buf += chunk while len(buf) >= 2: ln = buf[1] & 0x7f; off = 2 if ln == 126: if len(buf) < 4: break ln = struct.unpack(">H", buf[2:4])[0]; off = 4 elif ln == 127: if len(buf) < 10: break ln = struct.unpack(">Q", buf[2:10])[0]; off = 10 if len(buf) < off + ln: break pl = buf[off:off + ln].decode("utf-8", "replace"); buf = buf[off + ln:] try: m = json.loads(pl) except Exception: continue if isinstance(m, list) and m[0] == "EVENT" and len(m) >= 3: out.append(m[2]) elif isinstance(m, list) and m[0] == "EOSE": end = 0 s.close(); return out # ── anchor presence / timing ─────────────────────────────────────────────── def _has_ots_submission(cp): """True once an OTS anchor exists (created at submission time, before Bitcoin).""" return any(a.get("method") == "ots" for a in (cp.get("anchors") or [])) def _parse_iso(s): """ISO-8601 (with trailing Z) → epoch seconds, or None.""" if not s: return None try: from datetime import datetime return datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp() except Exception: return None # ── fork resolution by external Bitcoin ordering ─────────────────────────── def _ots_height(cp): """Confirmed OTS→Bitcoin block height of a served checkpoint, or None.""" for a in cp.get("anchors", []) or []: if a.get("method") == "ots" and a.get("status") == "confirmed": try: blob = json.loads(a.get("token_blob") or "{}") except Exception: blob = {} if blob.get("bitcoin_height"): return int(blob["bitcoin_height"]) return None def resolve_fork(cid, heads, head_height): """Tie-break conflicting heads at one checkpoint by external Bitcoin ordering. head_height maps head_hash -> (block_height, source_checkpoint_id) for every head whose OTS→Bitcoin commitment is visible here. The canonical head is whichever was committed at the lowest block — an ordering no party in this system can forge. Returns verdict line(s). Honest about partial knowledge: a head whose anchor isn't visible can't be ranked, only checked against its own .ots receipt. """ known = sorted(((h, head_height[h][0]) for h in heads if h in head_height), key=lambda t: t[1]) unknown = [h for h in heads if h not in head_height] out = [] if len(known) >= 2: win_h, win_b = known[0] out.append(f"↳ tie-break #{cid}: head {win_h[:12]}… is CANONICAL — earliest Bitcoin commitment (block {win_b}).") for h, b in known[1:]: out.append(f" head {h[:12]}… committed later (block {b}) ⇒ losing fork.") for h in unknown: out.append(f" head {h[:12]}… has no Bitcoin anchor visible here ⇒ cannot predate block {win_b}; losing unless its .ots receipt shows an earlier block.") elif len(known) == 1: h, b = known[0] out.append(f"↳ tie-break #{cid}: head {h[:12]}… is the only Bitcoin-committed candidate here (block {b}).") for u in unknown: out.append(f" head {u[:12]}… has no anchor visible here — if its .ots receipt predates block {b} it is canonical instead; otherwise the block-{b} head wins.") else: out.append(f"↳ tie-break #{cid}: UNRESOLVED — no competing head is Bitcoin-anchored yet (or receipts not public). Re-run after OTS confirmation.") return out # ── cross-check (pure; testable without network) ────────────────────────── def analyze(served, by_cp, base="touchstone.cv", nostr_heights=None, now=None, horizon_sec=1800): """served: {id: checkpoint}; by_cp: {id: [(event_id, content), …]}; nostr_heights: {head_hash: (block_height, checkpoint_id)} read from the Nostr Bitcoin-anchor mirror. now/horizon_sec drive the validity-horizon staleness check. → (problems, info_lines, resolutions). resolutions hold the Bitcoin-ordering tie-break for any fork/contradiction detected.""" problems, info, resolutions = [], [], [] # Heads whose Bitcoin commitment we can see: from the served feed's anchors, AND # from the Nostr Bitcoin-anchor mirror. The Nostr copy is independently held — it # works even for a head the server has stopped serving — so it takes precedence. head_height = {} for cp in served.values(): h = _ots_height(cp) if h is not None: head_height[cp["head_hash"]] = (h, cp["id"]) for head, (h, cid) in (nostr_heights or {}).items(): head_height[head] = (h, cid) # FORK: two events for the same checkpoint with different (root, head). for cid, lst in by_cp.items(): roots = {(c["merkle_root"], c["head_hash"]) for _, c in lst} if len(roots) > 1: problems.append(f"FORK at checkpoint #{cid}: {len(roots)} distinct roots on Nostr — " + ", ".join(f"{eid[:10]}…={c['merkle_root'][:12]}…" for eid, c in lst)) resolutions += resolve_fork(cid, {c["head_hash"] for _, c in lst}, head_height) # CONTRADICTION / HIDDEN / UNMIRRORED vs the served feed. for cid, cp in served.items(): lst = by_cp.get(cid) if not lst: info.append(f"[i] checkpoint #{cid}: served, not yet on Nostr (UNMIRRORED)") elif not any(c["merkle_root"] == cp["merkle_root"] and c["head_hash"] == cp["head_hash"] for _, c in lst): problems.append(f"CONTRADICTION at #{cid}: served root {cp['merkle_root'][:12]}… not among Nostr roots") heads = {cp["head_hash"]} | {c["head_hash"] for _, c in lst} resolutions += resolve_fork(cid, heads, head_height) else: info.append(f"[✓] checkpoint #{cid}: served root matches the Nostr mirror") for cid in by_cp: if cid not in served: problems.append(f"HIDDEN checkpoint #{cid}: on Nostr but not served by {base}") # Append-only chain of the served feed. order = sorted(served.values(), key=lambda c: c["seq_start"]) chain_ok = True for a, b in zip(order, order[1:]): if b["seq_start"] != a["seq_end"] + 1 or b.get("prev_checkpoint_hash") != a["head_hash"]: chain_ok = False problems.append(f"CHAIN break between #{a['id']} and #{b['id']} (not contiguous / not hash-linked)") if order: info.append(f"[{'✓' if chain_ok else '✗'}] served checkpoint chain is append-only") # Validity horizon: you cannot verify "never anchored", but you can verify "expected # by T, still absent". If this recorder anchors via OTS at all, a checkpoint older than # the horizon with no OTS submission is a positive red flag — its head is not on its way # to Bitcoin (withheld, or an anchoring outage) — not a neutral absence. (Submission is # near-instant; Bitcoin *confirmation* takes hours and is deliberately not flagged here.) if now is not None: ots_used = any(_has_ots_submission(cp) for cp in served.values()) if ots_used: for cp in served.values(): ts = _parse_iso(cp.get("created_at")) if ts is None: continue age = now - ts if age > horizon_sec and not _has_ots_submission(cp): problems.append( f"STALE checkpoint #{cp['id']}: {int(age // 60)} min old, past the " f"{int(horizon_sec // 60)}-min anchor horizon with no OTS submission — its head " f"is not on its way to Bitcoin (withheld, or an anchoring outage); don't rely on it.") return problems, info, resolutions def _selftest(): """Offline unit tests for the tie-break + detection logic.""" # two anchored heads: lower block is canonical, higher is a losing fork out = resolve_fork(1, {"Ha", "Hb"}, {"Ha": (955100, 1), "Hb": (955200, 1)}) assert any("Ha"[:12] in l and "CANONICAL" in l for l in out), out assert any("955100" in l for l in out) and any("losing fork" in l for l in out), out # one anchored vs one unseen anchor: partial, must not over-claim out = resolve_fork(2, {"Ha", "Hc"}, {"Ha": (955100, 1)}) assert any("only Bitcoin-committed" in l for l in out), out assert any("no anchor visible" in l for l in out), out # neither anchored yet: unresolved out = resolve_fork(3, {"Hx", "Hy"}, {}) assert any("UNRESOLVED" in l for l in out), out # analyze: a contradiction where the served head is Bitcoin-anchored served = {1: {"id": 1, "seq_start": 0, "seq_end": 0, "merkle_root": "R2", "head_hash": "H2", "prev_checkpoint_hash": None, "anchors": [{"method": "ots", "status": "confirmed", "token_blob": json.dumps({"bitcoin_height": 955300})}]}} by_cp = {1: [("e1", {"recorder": "r", "checkpoint_id": 1, "merkle_root": "R1", "head_hash": "H1"})]} pr, _i, res = analyze(served, by_cp) assert any("CONTRADICTION" in p for p in pr), pr assert any("955300" in l for l in res), res # analyze: a pure Nostr fork with no anchors → detected + UNRESOLVED by_cp2 = {5: [("e1", {"recorder": "r", "checkpoint_id": 5, "merkle_root": "Ra", "head_hash": "Ha"}), ("e2", {"recorder": "r", "checkpoint_id": 5, "merkle_root": "Rb", "head_hash": "Hb"})]} pr2, _i2, res2 = analyze({}, by_cp2) assert any("FORK" in p for p in pr2), pr2 assert any("UNRESOLVED" in l for l in res2), res2 # clean: served matches Nostr, nothing flagged served3 = {1: {"id": 1, "seq_start": 0, "seq_end": 0, "merkle_root": "R1", "head_hash": "H1", "prev_checkpoint_hash": None, "anchors": []}} by_cp3 = {1: [("e1", {"recorder": "r", "checkpoint_id": 1, "merkle_root": "R1", "head_hash": "H1"})]} pr3, _i3, res3 = analyze(served3, by_cp3) assert not pr3 and not res3, (pr3, res3) # Nostr-sourced heights resolve a fork even when the SERVER serves no anchor for # the competing heads (the boundary the Bitcoin-anchor mirror closes). by_cp4 = {7: [("e1", {"recorder": "r", "checkpoint_id": 7, "merkle_root": "Ra", "head_hash": "Hlow"}), ("e2", {"recorder": "r", "checkpoint_id": 7, "merkle_root": "Rb", "head_hash": "Hhigh"})]} nh = {"Hlow": (955100, 7), "Hhigh": (955200, 7)} pr4, _i4, res4 = analyze({}, by_cp4, nostr_heights=nh) assert any("FORK" in p for p in pr4), pr4 assert any("CANONICAL" in l and "Hlow"[:12] in l and "955100" in l for l in res4), res4 # validity horizon: an OTS-using recorder with an old checkpoint lacking OTS submission → STALE now = 1_000_000_000 old = "1970-01-01T00:00:00Z" # ancient → far past any horizon anch = lambda has_ots: ([{"method": "ots", "status": "pending", "token_blob": "{}"}] if has_ots else [{"method": "self", "status": "confirmed", "token_blob": ""}]) served5 = { 1: {"id": 1, "seq_start": 0, "seq_end": 0, "head_hash": "H1", "merkle_root": "R1", "prev_checkpoint_hash": None, "created_at": old, "anchors": anch(True)}, 2: {"id": 2, "seq_start": 1, "seq_end": 1, "head_hash": "H2", "merkle_root": "R2", "prev_checkpoint_hash": "H1", "created_at": old, "anchors": anch(False)}, } by5 = {1: [("e", {"recorder": "r", "checkpoint_id": 1, "merkle_root": "R1", "head_hash": "H1"})], 2: [("e", {"recorder": "r", "checkpoint_id": 2, "merkle_root": "R2", "head_hash": "H2"})]} pr5, _i5, _r5 = analyze(served5, by5, now=now, horizon_sec=1800) assert any("STALE checkpoint #2" in p for p in pr5), pr5 assert not any("STALE checkpoint #1" in p for p in pr5), "cp#1 has OTS, must not be stale" # a recorder that never uses OTS must NOT be flagged stale (self-calibrating) served6 = {1: {"id": 1, "seq_start": 0, "seq_end": 0, "head_hash": "H1", "merkle_root": "R1", "prev_checkpoint_hash": None, "created_at": old, "anchors": anch(False)}} by6 = {1: [("e", {"recorder": "r", "checkpoint_id": 1, "merkle_root": "R1", "head_hash": "H1"})]} pr6, _i6, _r6 = analyze(served6, by6, now=now, horizon_sec=1800) assert not any("STALE" in p for p in pr6), pr6 print("gossip_check selftest: OK") # ── main ────────────────────────────────────────────────────────────────── def main(): if "--selftest" in sys.argv: _selftest(); sys.exit(0) args = [a for a in sys.argv[1:] if not a.startswith("--")] base = "https://touchstone.cv" for i, a in enumerate(sys.argv): if a == "--base" and i + 1 < len(sys.argv): base = sys.argv[i + 1].rstrip("/") if not args: print("usage: gossip_check.py [--base URL]"); sys.exit(2) rid = args[0] try: feed = http_json(f"{base}/.well-known/touchstone/checkpoints/{rid}") desc = http_json(f"{base}/.well-known/touchstone/nostr") except Exception as e: print(f"[!] could not fetch from {base}: {e}"); sys.exit(2) pubkey = desc.get("pubkey"); relays = desc.get("relays", []); kind = desc.get("kind", 1623) served = {cp["id"]: cp for cp in feed.get("checkpoints", [])} print(f"Touchstone gossip check — recorder {rid}") print(f" served: {len(served)} checkpoint(s) from {base}") print(f" mirror: Nostr pubkey {pubkey[:16]}… across {len(relays)} relay(s)\n") # Collect Touchstone's checkpoint events from the relays, sig-verified. events = {} # event_id -> event forged = 0 for relay in relays: try: evs = relay_query(relay, {"authors": [pubkey], "kinds": [kind], "#t": ["touchstone-checkpoint"], "limit": 1000}) except Exception: evs = [] for ev in evs: if ev.get("pubkey") != pubkey: continue if not event_sig_ok(ev): forged += 1 continue events[ev["id"]] = ev print(f" fetched {len(events)} signed checkpoint event(s) from relays" + (f" ({forged} invalid-sig ignored)" if forged else "")) # Group this recorder's events by checkpoint id. by_cp = {} for ev in events.values(): try: c = json.loads(ev["content"]) except Exception: continue if c.get("recorder") != rid: continue by_cp.setdefault(c.get("checkpoint_id"), []).append((ev["id"], c)) # Bitcoin commitments mirrored to Nostr: an independently-held source for the fork # tie-break that survives the server hiding a head. Sig-verified against the same key. nostr_heights, btc_events = {}, {} for relay in relays: try: evs = relay_query(relay, {"authors": [pubkey], "kinds": [kind], "#t": ["touchstone-bitcoin-anchor"], "limit": 1000}) except Exception: evs = [] for ev in evs: if ev.get("pubkey") == pubkey and event_sig_ok(ev): btc_events[ev["id"]] = ev for ev in btc_events.values(): try: c = json.loads(ev["content"]) except Exception: continue if c.get("recorder") == rid and c.get("bitcoin_height") and c.get("head_hash"): nostr_heights[c["head_hash"]] = (int(c["bitcoin_height"]), c.get("checkpoint_id")) if nostr_heights: print(f" fetched {len(nostr_heights)} Bitcoin commitment(s) from Nostr (independent fork tie-break)") horizon_min = 30 for i, a in enumerate(sys.argv): if a == "--horizon-min" and i + 1 < len(sys.argv): try: horizon_min = int(sys.argv[i + 1]) except ValueError: pass problems, info, resolutions = analyze(served, by_cp, base, nostr_heights, now=time.time(), horizon_sec=horizon_min * 60) for line in info: print(" " + line) print() if problems: print("\033[31mSPLIT VIEW / INCONSISTENCY DETECTED:\033[0m") for pr in problems: print(" ✗ " + pr) if resolutions: print("\n External Bitcoin-ordering tie-break (the one order the server can't forge):") for r in resolutions: print(" " + r) sys.exit(1) print("\033[32mCONSISTENT\033[0m — every served checkpoint matches an independently-relayed,") print("Touchstone-signed Nostr event; no fork, no contradiction, chain is append-only.") sys.exit(0) if __name__ == "__main__": main()