Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Replicacion

RouchDB implementa el protocolo de replicacion de CouchDB, permitiendo sincronizacion bidireccional entre bases de datos locales y remotas.

Conceptos basicos

La replicacion copia cambios de una base de datos fuente a una base de datos destino. Es incremental: solo transfiere los documentos que han cambiado desde la ultima sincronizacion.

Replicacion basica

Push (local a remoto)

#![allow(unused)]
fn main() {
use rouchdb::Database;

let local = Database::open("mydb.redb", "mydb")?;
let remote = Database::http("http://admin:password@localhost:5984/mydb");

let result = local.replicate_to(&remote).await?;
println!("Docs leidos: {}", result.docs_read);
println!("Docs escritos: {}", result.docs_written);
}

Pull (remoto a local)

#![allow(unused)]
fn main() {
let result = local.replicate_from(&remote).await?;
}

Sync (bidireccional)

#![allow(unused)]
fn main() {
let (push, pull) = local.sync(&remote).await?;
println!("Push: {} escritos", push.docs_written);
println!("Pull: {} escritos", pull.docs_written);
}

Configurar CouchDB

Para pruebas, usa Docker Compose:

docker compose up -d

Esto levanta CouchDB en http://localhost:15984 con credenciales admin:password.

Para crear una base de datos en CouchDB:

curl -X PUT http://admin:password@localhost:15984/mydb

Opciones de replicacion

#![allow(unused)]
fn main() {
use rouchdb::ReplicationOptions;

let result = local.replicate_to_with_opts(&remote, ReplicationOptions {
    batch_size: 50,
    ..Default::default()
}).await?;
}
CampoTipoDefaultDescripcion
batch_sizeu64100Numero de cambios a procesar por iteracion
batches_limitu6410Maximo numero de lotes a buffear
filterOption<ReplicationFilter>NoneFiltro opcional para replicacion selectiva
sinceOption<Seq>NoneSecuencia inicial (en vez de leer checkpoint)
checkpointbooltrueDeshabilitar con false para no guardar/leer checkpoints
liveboolfalseHabilitar replicacion continua
retryboolfalseReintentar automaticamente en caso de error
poll_intervalDuration500msIntervalo de sondeo en modo continuo
back_off_functionOption<Box<dyn Fn(u32) -> Duration + Send + Sync>>NoneFuncion de backoff para reintentos

Replicacion filtrada

Se puede replicar un subconjunto de documentos usando ReplicationFilter:

#![allow(unused)]
fn main() {
use rouchdb::{ReplicationOptions, ReplicationFilter};

// Por IDs de documento
let result = local.replicate_to_with_opts(&remote, ReplicationOptions {
    filter: Some(ReplicationFilter::DocIds(vec!["doc1".into(), "doc2".into()])),
    ..Default::default()
}).await?;

// Por selector Mango
let result = local.replicate_to_with_opts(&remote, ReplicationOptions {
    filter: Some(ReplicationFilter::Selector(serde_json::json!({"type": "invoice"}))),
    ..Default::default()
}).await?;

// Por closure personalizado
let result = local.replicate_to_with_opts(&remote, ReplicationOptions {
    filter: Some(ReplicationFilter::Custom(std::sync::Arc::new(|change| {
        change.id.starts_with("public:")
    }))),
    ..Default::default()
}).await?;
}

Checkpoints

Los checkpoints permiten que la replicacion se reanude despues de una interrupcion. RouchDB guarda el progreso en documentos locales (_local/{replication_id}) en ambos lados.

Cuando la replicacion se reinicia:

  1. Lee el checkpoint de ambos lados
  2. Encuentra la ultima secuencia comun
  3. Reanuda desde ahi (no necesita empezar desde cero)

Resultado de la replicacion

#![allow(unused)]
fn main() {
let result = local.replicate_to(&remote).await?;

if result.ok {
    println!("Replicacion exitosa");
    println!("  Docs leidos: {}", result.docs_read);
    println!("  Docs escritos: {}", result.docs_written);
    println!("  Ultima secuencia: {:?}", result.last_seq);
} else {
    println!("Replicacion con errores:");
    for err in &result.errors {
        println!("  - {}", err);
    }
}
}
CampoTipoDescripcion
okbooltrue si no hubo errores
docs_readu64Cambios procesados
docs_writtenu64Documentos escritos en el destino
errorsVec<String>Errores individuales por documento
last_seqSeqUltima secuencia alcanzada

Ejemplo completo

Sincronizar una base de datos local redb con CouchDB:

use rouchdb::Database;

#[tokio::main]
async fn main() -> rouchdb::Result<()> {
    // Base de datos local persistente
    let local = Database::open("app.redb", "app")?;

    // CouchDB remoto
    let remote = Database::http("http://admin:password@localhost:5984/app");

    // Agregar datos localmente (funciona offline)
    local.put("nota:1", serde_json::json!({
        "titulo": "Mi primera nota",
        "contenido": "Hola desde RouchDB!"
    })).await?;

    // Cuando hay conexion, sincronizar
    let (push, pull) = local.sync(&remote).await?;

    println!("Sincronizacion completa:");
    println!("  Push: {} docs", push.docs_written);
    println!("  Pull: {} docs", pull.docs_written);

    Ok(())
}

Replicacion con eventos

Usa replicate_to_with_events() para recibir eventos de progreso durante la replicacion:

#![allow(unused)]
fn main() {
use rouchdb::ReplicationEvent;

let (result, mut rx) = local.replicate_to_with_events(
    &remote,
    ReplicationOptions::default(),
).await?;

while let Ok(event) = rx.try_recv() {
    match event {
        ReplicationEvent::Active => println!("Replicacion iniciada"),
        ReplicationEvent::Change { docs_read } => {
            println!("Progreso: {} docs leidos", docs_read);
        }
        ReplicationEvent::Complete(r) => {
            println!("Completado: {} escritos", r.docs_written);
        }
        ReplicationEvent::Error(msg) => println!("Error: {}", msg),
        ReplicationEvent::Paused => println!("Esperando cambios..."),
    }
}
}

Replicacion continua (live)

La replicacion continua se ejecuta en segundo plano, sondeando periodicamente por nuevos cambios. Equivalente a { live: true } en PouchDB.

#![allow(unused)]
fn main() {
use rouchdb::{ReplicationOptions, ReplicationEvent};

let (mut rx, handle) = local.replicate_to_live(&remote, ReplicationOptions {
    live: true,
    poll_interval: std::time::Duration::from_millis(500),
    retry: true,
    ..Default::default()
});

// Procesar eventos en un loop
tokio::spawn(async move {
    while let Some(event) = rx.recv().await {
        match event {
            ReplicationEvent::Complete(r) => {
                println!("Lote completado: {} docs escritos", r.docs_written);
            }
            ReplicationEvent::Paused => {
                println!("Actualizado, esperando nuevos cambios...");
            }
            _ => {}
        }
    }
});

// Cancelar cuando sea necesario
handle.cancel();
}

El ReplicationHandle controla la replicacion:

  • handle.cancel() detiene la replicacion.
  • Si se descarta el handle (Drop), la replicacion tambien se cancela.

Manejo de errores

  • Error de red: la replicacion se detiene. El checkpoint guarda el progreso para reanudar despues.
  • Error de autenticacion (401/403): la replicacion se detiene inmediatamente.
  • Error en documento individual: se registra en result.errors pero la replicacion continua con los demas documentos.
  • Conflicto de checkpoint (409): se reintenta con la ultima revision.