GitHunt
90

90-008/tapped

mirror of https://tangled.org/ptr.pet/tapped

tapped

Crates.io Version
docs.rs

A Rust wrapper library for the tap ATProto sync utility.

tapped provides an idiomatic async Rust interface for spawning and communicating with a tap subprocess, making it easy to build applications that sync data from the ATProto network.

Features

  • Spawn and manage tap subprocesses with graceful shutdown
  • Strongly-typed configuration for all tap envvars
  • Strongly-typed async Rust functions covering all of tap's HTTP API endpoints
  • WebSocket-based event channel with automatic acknowledgment

Installation

Add to your Cargo.toml:

[dependencies]
tapped = "0.2"

You'll also need the tap binary. Build it from the indigo repository:

cd cmd/tap && go build

tapped has been most recently tested against:

tap version v0.0.0-20260120225912-12d69fa4d209-rev-12d69fa

Quick Start

use tapped::{TapHandle, TapConfig, Event};

#[tokio::main]
async fn main() -> tapped::Result<()> {
    let config = TapConfig::builder()
        .database_url("sqlite://tap.db")
        .collection_filter("app.bsky.feed.post")
        .build();

    // Spawn tap and connect
    let handle = TapHandle::spawn_default(config).await?;
    
    // Subscribe to events
    let (mut receiver, mut ack_sender) = handle.channel().await?;
    
    while let Ok((event, ack_id)) = receiver.recv().await {
        match event {
            Event::Record(record) => {
                println!("[{:?}] {}/{}",
                    record.action,
                    record.collection,
                    record.rkey
                );
            }
            Event::Identity(identity) => {
                println!("Identity: {} -> {}", identity.did, identity.handle);
            }
        }
        // Manual acknowledgment required
        ack_sender.ack(ack_id).await?;
    }

    Ok(())
}

Usage Patterns

Connect to Existing Instance

If you have a tap instance already running:

use tapped::TapClient;

let client = TapClient::new("http://localhost:2480")?;
client.health().await?;

Spawn with Custom Binary Path

use tapped::{TapProcess, TapConfig};

let config = TapConfig::builder()
    .database_url("sqlite://my-app.db")
    .build();

let mut process = TapProcess::spawn("/path/to/tap", config).await?;
let client = process.client()?;

// Use the client...

process.shutdown().await?;

TapHandle combines process management and client access:

use tapped::{TapHandle, TapConfig};

let config = TapConfig::builder()
    .database_url("sqlite://app.db")
    .full_network(false)
    .build();

let handle = TapHandle::spawn_default(config).await?;

// TapHandle derefs to TapClient, so you can call client methods directly
handle.health().await?;
let count = handle.repo_count().await?;
println!("Tracking {} repos", count);

Configuration Options

use tapped::{TapConfig, LogLevel};
use std::time::Duration;

let config = TapConfig::builder()
    // Database
    .database_url("sqlite://tap.db")
    .max_db_conns(10)
    
    // Network
    .bind("127.0.0.1:2480")
    .relay_url("wss://bsky.network".parse().unwrap())
    .plc_url("https://plc.directory".parse().unwrap())
    
    // Filtering
    .signal_collection("app.bsky.feed.post")
    .collection_filter("app.bsky.feed.post")
    .collection_filter("app.bsky.feed.like")
    .full_network(false)
    
    // Performance
    .firehose_parallelism(10)
    .resync_parallelism(5)
    .outbox_parallelism(10)
    .outbox_capacity(10000)
    
    // Timeouts
    .repo_fetch_timeout(Duration::from_secs(30))
    .startup_timeout(Duration::from_secs(60))
    .shutdown_timeout(Duration::from_secs(10))
    
    // Logging
    .log_level(LogLevel::Info)
    
    .build();

Working with Events

Events must be manually acknowledged:

use tapped::{Event, RecordAction};

let (mut receiver, mut ack_sender) = client.channel().await?;

while let Ok((event, ack_id)) = receiver.recv().await {
    match event {
        Event::Record(record) => {
            match record.action {
                RecordAction::Create => {
                    // Access the raw JSON as a string
                    if let Some(json) = record.record_as_str() {
                        println!("Raw JSON: {}", json);
                    }

                    // Or deserialize to a specific type
                    // let post: MyPostType = record.deserialize_as()?;
                }
                RecordAction::Update => { /* ... */ }
                RecordAction::Delete => { /* ... */ }
                _ => {}
            }
        }
        Event::Identity(identity) => {
            println!("{} is now @{}", identity.did, identity.handle);
        }
    }
    // Ack must be sent manually
    ack_sender.ack(ack_id).await?;
}

Managing Repositories

// Add repos to track
client.add_repos(&["did:plc:abc123", "did:plc:def456"]).await?;

// Remove repos
client.remove_repos(&["did:plc:abc123"]).await?;

// Get info about a specific repo
let info = client.repo_info("did:plc:def456").await?;
println!("State: {:?}, Records: {}", info.state, info.records);

// Resolve a DID to its document
let doc = client.resolve_did("did:plc:def456").await?;
println!("Handles: {:?}", doc.also_known_as);

Checking Stats

let repos = client.repo_count().await?;
let records = client.record_count().await?;
let outbox = client.outbox_buffer().await?;
let resync = client.resync_buffer().await?;
let cursors = client.cursors().await?;

println!("Tracking {} repos with {} records", repos, records);
println!("Outbox buffer: {}, Resync buffer: {}", outbox, resync);
println!("Firehose cursor: {:?}", cursors.firehose);

Example: Syncing standard.site Records with Schema Generation and Validation

The repository includes a complete example demonstrating how to sync and validate ATProto records using tapped together with the jacquard crates.

The jacquard ecosystem provides runtime validation of records against their lexicon constraints, and the ability to generate Rust structs from lexicon JSON files.

tapped/
├── tapped/                 # The main tapped library
├── lexicons-example/       # Generated types from lexicon schemas
│   ├── lexicons/           # Source lexicon JSON files
│   │   ├── site.standard.publication.json
│   │   ├── site.standard.document.json
│   │   └── ...
│   └── src/                # Generated Rust code
└── standard-site-sync/     # Example binary using both packages

These files were generated like so:

# Install the code generator
cargo install jacquard-lexgen

jacquard-codegen -i lexicons-example/lexicons -o lexicons-example/src

This produces strongly-typed structs with built-in validation. For example, the site.standard.publication lexicon becomes:

use lexicons_example::site_standard::publication::Publication;

// Deserialize from JSON
let publication: Publication = serde_json::from_str(json)?;

// Validate against lexicon constraints (max length, grapheme limits, etc.)
publication.validate()?;

// Access typed fields
println!("Name: {}", publication.name.as_str());
println!("URL: {}", publication.url.as_str());

For more detail see process_record_event in main.rs.

License

MIT

90-008/tapped | GitHunt