Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions sentience/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,25 @@ def _compute_hash(self, text: str) -> str:
"""Compute SHA256 hash of text."""
return hashlib.sha256(text.encode("utf-8")).hexdigest()

async def _best_effort_post_snapshot_digest(self, goal: str) -> str | None:
"""
Best-effort post-action snapshot digest for tracing (async).
"""
try:
snap_opts = SnapshotOptions(
limit=min(10, self.default_snapshot_limit),
goal=f"{goal} (post)",
)
snap_opts.screenshot = False
snap_opts.show_overlay = self.config.show_overlay if self.config else None
post_snap = await snapshot_async(self.browser, snap_opts)
if post_snap.status != "success":
return None
digest_input = f"{post_snap.url}{post_snap.timestamp}"
return f"sha256:{self._compute_hash(digest_input)}"
except Exception:
return None

def _best_effort_post_snapshot_digest(self, goal: str) -> str | None:
"""
Best-effort post-action snapshot digest for tracing.
Expand Down
44 changes: 44 additions & 0 deletions sentience/agent_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,50 @@ async def snapshot(self, **kwargs: Any) -> Snapshot:
await self._handle_captcha_if_needed(self.last_snapshot, source="gateway")
return self.last_snapshot

async def sampled_snapshot(
self,
*,
samples: int = 4,
scroll_delta_y: float | None = None,
settle_ms: int = 250,
union_limit: int | None = None,
restore_scroll: bool = True,
**kwargs: Any,
) -> Snapshot:
"""
Take multiple snapshots while scrolling and merge them into a "union snapshot".

Intended for analysis/extraction on long / virtualized pages where a single
viewport snapshot is insufficient.

IMPORTANT:
- The returned snapshot's element bboxes may not correspond to the current viewport.
Do NOT use it for clicking unless you also scroll to the right position.
- This method does NOT update `self.last_snapshot` (to avoid confusing verification
loops that depend on the current viewport).
"""
# Legacy browser path: fall back to a single snapshot (we can't rely on backend ops).
if hasattr(self, "_legacy_browser") and hasattr(self, "_legacy_page"):
return await self.snapshot(**kwargs)

from .backends.snapshot import sampled_snapshot as backend_sampled_snapshot

# Merge default options with call-specific kwargs
options_dict = self._snapshot_options.model_dump(exclude_none=True)
options_dict.update(kwargs)
options = SnapshotOptions(**options_dict)

snap = await backend_sampled_snapshot(
self.backend,
options=options,
samples=samples,
scroll_delta_y=scroll_delta_y,
settle_ms=settle_ms,
union_limit=union_limit,
restore_scroll=restore_scroll,
)
return snap

async def evaluate_js(self, request: EvaluateJsRequest) -> EvaluateJsResult:
"""
Evaluate JavaScript expression in the active backend.
Expand Down
188 changes: 186 additions & 2 deletions sentience/backends/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from typing import TYPE_CHECKING, Any

from ..constants import SENTIENCE_API_URL
from ..models import Snapshot, SnapshotOptions
from ..models import Element, Snapshot, SnapshotOptions
from ..snapshot import (
_build_snapshot_payload,
_merge_api_result_with_local,
Expand Down Expand Up @@ -259,6 +259,182 @@ async def snapshot(
return await _snapshot_via_extension(backend, options)


def _normalize_ws(text: str) -> str:
return " ".join((text or "").split()).strip()


def _dedupe_key(el: Element) -> tuple:
"""
Best-effort stable dedupe key across scroll-sampled snapshots.

Notes:
- IDs are not reliable across snapshots (virtualization can remount nodes).
- BBox coordinates are viewport-relative and depend on scroll position.
- Prefer href/name/text + approximate document position when available.
"""
href = (el.href or "").strip()
if href:
return ("href", href)

name = _normalize_ws(el.name or "")
if name:
return ("role_name", el.role, name)

text = _normalize_ws(el.text or "")
doc_y = el.doc_y
if text:
# Use doc_y when present (more stable across scroll positions than bbox.y).
if isinstance(doc_y, (int, float)):
return ("role_text_docy", el.role, text[:120], int(float(doc_y) // 10))
return ("role_text", el.role, text[:120])

# Fallback: role + approximate position
if isinstance(doc_y, (int, float)):
return ("role_docy", el.role, int(float(doc_y) // 10))

# Last resort (can still dedupe within a single snapshot)
return ("id", int(el.id))


def merge_snapshots(
snaps: list[Snapshot],
*,
union_limit: int | None = None,
) -> Snapshot:
"""
Merge multiple snapshots into a single "union snapshot" for analysis/extraction.

CRITICAL:
- Element bboxes are viewport-relative to the scroll position at the time each snapshot
was taken. Do NOT use merged elements for direct clicking unless you also scroll
back to their position.
"""
if not snaps:
raise ValueError("merge_snapshots requires at least one snapshot")

base = snaps[0]
best_by_key: dict[tuple, Element] = {}
first_seen_idx: dict[tuple, int] = {}

# Keep the "best" representative per key:
# - Prefer higher importance (usually means in-viewport at that sampling moment)
# - Prefer having href/text/name (more useful for extraction)
def _quality_score(e: Element) -> tuple:
has_href = 1 if (e.href or "").strip() else 0
has_text = 1 if _normalize_ws(e.text or "") else 0
has_name = 1 if _normalize_ws(e.name or "") else 0
has_docy = 1 if isinstance(e.doc_y, (int, float)) else 0
return (e.importance, has_href, has_text, has_name, has_docy)

idx = 0
for snap in snaps:
for el in list(getattr(snap, "elements", []) or []):
k = _dedupe_key(el)
if k not in first_seen_idx:
first_seen_idx[k] = idx
prev = best_by_key.get(k)
if prev is None or _quality_score(el) > _quality_score(prev):
best_by_key[k] = el
idx += 1

merged: list[Element] = list(best_by_key.values())

# Deterministic ordering: prefer document order when doc_y is available,
# then fall back to "first seen" (stable for a given sampling sequence).
def _sort_key(e: Element) -> tuple:
doc_y = e.doc_y
if isinstance(doc_y, (int, float)):
return (0, float(doc_y), -int(e.importance))
return (1, float("inf"), first_seen_idx.get(_dedupe_key(e), 10**9))

merged.sort(key=_sort_key)

if union_limit is not None:
try:
lim = max(1, int(union_limit))
except (TypeError, ValueError):
lim = None
if lim is not None:
merged = merged[:lim]

# Construct a new Snapshot object with merged elements.
# Keep base url/viewport/diagnostics, and drop screenshot by default to avoid confusion.
data = base.model_dump()
data["elements"] = [e.model_dump() for e in merged]
data["screenshot"] = None
return Snapshot(**data)


async def sampled_snapshot(
backend: "BrowserBackend",
*,
options: SnapshotOptions | None = None,
samples: int = 4,
scroll_delta_y: float | None = None,
settle_ms: int = 250,
union_limit: int | None = None,
restore_scroll: bool = True,
) -> Snapshot:
"""
Take multiple snapshots while scrolling downward and return a merged union snapshot.

Designed for long / virtualized results pages where a single viewport snapshot
cannot cover enough relevant items.
"""
if options is None:
options = SnapshotOptions()

k = max(1, int(samples))
if k <= 1:
return await snapshot(backend, options=options)

# Baseline scroll position
try:
info = await backend.refresh_page_info()
base_scroll_y = float(getattr(info, "scroll_y", 0.0) or 0.0)
vh = float(getattr(info, "height", 800) or 800)
except Exception: # pylint: disable=broad-exception-caught
base_scroll_y = 0.0
vh = 800.0

# Choose a conservative scroll delta if not provided.
delta = float(scroll_delta_y) if scroll_delta_y is not None else (vh * 0.9)
if delta <= 0:
delta = max(200.0, vh * 0.9)

snaps: list[Snapshot] = []
try:
# Snapshot at current position.
snaps.append(await snapshot(backend, options=options))

for _i in range(1, k):
try:
# Scroll by wheel delta (plays nicer with sites that hook scroll events).
await backend.wheel(delta_y=delta)
except Exception: # pylint: disable=broad-exception-caught
# Fallback: direct scrollTo
try:
cur = await backend.eval("window.scrollY")
await backend.call("(y) => window.scrollTo(0, y)", [float(cur) + delta])
except Exception: # pylint: disable=broad-exception-caught
break

if settle_ms > 0:
await asyncio.sleep(float(settle_ms) / 1000.0)

snaps.append(await snapshot(backend, options=options))
finally:
if restore_scroll:
try:
await backend.call("(y) => window.scrollTo(0, y)", [float(base_scroll_y)])
if settle_ms > 0:
await asyncio.sleep(min(0.2, float(settle_ms) / 1000.0))
except Exception: # pylint: disable=broad-exception-caught
pass

return merge_snapshots(snaps, union_limit=union_limit)


async def _wait_for_extension(
backend: "BrowserBackend",
timeout_ms: int = 5000,
Expand All @@ -273,7 +449,6 @@ async def _wait_for_extension(
Raises:
RuntimeError: If extension not injected within timeout
"""
import asyncio
import logging

logger = logging.getLogger("sentience.backends.snapshot")
Expand Down Expand Up @@ -446,6 +621,15 @@ async def _snapshot_via_api(
# Re-raise validation errors as-is
raise
except Exception as e:
# Preserve structured gateway details when available.
try:
from ..snapshot import SnapshotGatewayError # type: ignore

if isinstance(e, SnapshotGatewayError):
raise
except Exception:
pass

# Fallback to local extension on API error
# This matches the behavior of the main snapshot function
raise RuntimeError(
Expand Down
Loading
Loading