90-008/tapped
mirror of https://tangled.org/ptr.pet/tapped
tapped
A Rust wrapper library for the tap ATProto sync utility.
tapped provides an idiomatic async Rust interface for spawning and communicating with a tap subprocess, making it easy to build applications that sync data from the ATProto network.
Features
- Spawn and manage
tapsubprocesses with graceful shutdown - Strongly-typed configuration for all tap envvars
- Strongly-typed async Rust functions covering all of tap's HTTP API endpoints
- WebSocket-based event channel with automatic acknowledgment
Installation
Add to your Cargo.toml:
[dependencies]
tapped = "0.2"You'll also need the tap binary. Build it from the indigo repository:
cd cmd/tap && go buildtapped has been most recently tested against:
tap version v0.0.0-20260120225912-12d69fa4d209-rev-12d69fa
Quick Start
use tapped::{TapHandle, TapConfig, Event};
#[tokio::main]
async fn main() -> tapped::Result<()> {
let config = TapConfig::builder()
.database_url("sqlite://tap.db")
.collection_filter("app.bsky.feed.post")
.build();
// Spawn tap and connect
let handle = TapHandle::spawn_default(config).await?;
// Subscribe to events
let (mut receiver, mut ack_sender) = handle.channel().await?;
while let Ok((event, ack_id)) = receiver.recv().await {
match event {
Event::Record(record) => {
println!("[{:?}] {}/{}",
record.action,
record.collection,
record.rkey
);
}
Event::Identity(identity) => {
println!("Identity: {} -> {}", identity.did, identity.handle);
}
}
// Manual acknowledgment required
ack_sender.ack(ack_id).await?;
}
Ok(())
}Usage Patterns
Connect to Existing Instance
If you have a tap instance already running:
use tapped::TapClient;
let client = TapClient::new("http://localhost:2480")?;
client.health().await?;Spawn with Custom Binary Path
use tapped::{TapProcess, TapConfig};
let config = TapConfig::builder()
.database_url("sqlite://my-app.db")
.build();
let mut process = TapProcess::spawn("/path/to/tap", config).await?;
let client = process.client()?;
// Use the client...
process.shutdown().await?;Using TapHandle (Recommended)
TapHandle combines process management and client access:
use tapped::{TapHandle, TapConfig};
let config = TapConfig::builder()
.database_url("sqlite://app.db")
.full_network(false)
.build();
let handle = TapHandle::spawn_default(config).await?;
// TapHandle derefs to TapClient, so you can call client methods directly
handle.health().await?;
let count = handle.repo_count().await?;
println!("Tracking {} repos", count);Configuration Options
use tapped::{TapConfig, LogLevel};
use std::time::Duration;
let config = TapConfig::builder()
// Database
.database_url("sqlite://tap.db")
.max_db_conns(10)
// Network
.bind("127.0.0.1:2480")
.relay_url("wss://bsky.network".parse().unwrap())
.plc_url("https://plc.directory".parse().unwrap())
// Filtering
.signal_collection("app.bsky.feed.post")
.collection_filter("app.bsky.feed.post")
.collection_filter("app.bsky.feed.like")
.full_network(false)
// Performance
.firehose_parallelism(10)
.resync_parallelism(5)
.outbox_parallelism(10)
.outbox_capacity(10000)
// Timeouts
.repo_fetch_timeout(Duration::from_secs(30))
.startup_timeout(Duration::from_secs(60))
.shutdown_timeout(Duration::from_secs(10))
// Logging
.log_level(LogLevel::Info)
.build();Working with Events
Events must be manually acknowledged:
use tapped::{Event, RecordAction};
let (mut receiver, mut ack_sender) = client.channel().await?;
while let Ok((event, ack_id)) = receiver.recv().await {
match event {
Event::Record(record) => {
match record.action {
RecordAction::Create => {
// Access the raw JSON as a string
if let Some(json) = record.record_as_str() {
println!("Raw JSON: {}", json);
}
// Or deserialize to a specific type
// let post: MyPostType = record.deserialize_as()?;
}
RecordAction::Update => { /* ... */ }
RecordAction::Delete => { /* ... */ }
_ => {}
}
}
Event::Identity(identity) => {
println!("{} is now @{}", identity.did, identity.handle);
}
}
// Ack must be sent manually
ack_sender.ack(ack_id).await?;
}Managing Repositories
// Add repos to track
client.add_repos(&["did:plc:abc123", "did:plc:def456"]).await?;
// Remove repos
client.remove_repos(&["did:plc:abc123"]).await?;
// Get info about a specific repo
let info = client.repo_info("did:plc:def456").await?;
println!("State: {:?}, Records: {}", info.state, info.records);
// Resolve a DID to its document
let doc = client.resolve_did("did:plc:def456").await?;
println!("Handles: {:?}", doc.also_known_as);Checking Stats
let repos = client.repo_count().await?;
let records = client.record_count().await?;
let outbox = client.outbox_buffer().await?;
let resync = client.resync_buffer().await?;
let cursors = client.cursors().await?;
println!("Tracking {} repos with {} records", repos, records);
println!("Outbox buffer: {}, Resync buffer: {}", outbox, resync);
println!("Firehose cursor: {:?}", cursors.firehose);Example: Syncing standard.site Records with Schema Generation and Validation
The repository includes a complete example demonstrating how to sync and validate ATProto records using tapped together with the jacquard crates.
The jacquard ecosystem provides runtime validation of records against their lexicon constraints, and the ability to generate Rust structs from lexicon JSON files.
tapped/
├── tapped/ # The main tapped library
├── lexicons-example/ # Generated types from lexicon schemas
│ ├── lexicons/ # Source lexicon JSON files
│ │ ├── site.standard.publication.json
│ │ ├── site.standard.document.json
│ │ └── ...
│ └── src/ # Generated Rust code
└── standard-site-sync/ # Example binary using both packages
These files were generated like so:
# Install the code generator
cargo install jacquard-lexgen
jacquard-codegen -i lexicons-example/lexicons -o lexicons-example/srcThis produces strongly-typed structs with built-in validation. For example, the site.standard.publication lexicon becomes:
use lexicons_example::site_standard::publication::Publication;
// Deserialize from JSON
let publication: Publication = serde_json::from_str(json)?;
// Validate against lexicon constraints (max length, grapheme limits, etc.)
publication.validate()?;
// Access typed fields
println!("Name: {}", publication.name.as_str());
println!("URL: {}", publication.url.as_str());For more detail see process_record_event in main.rs.
License
MIT