Descarga el Workflow de n8n de esta lección:
En la lección anterior, logramos escanear recursivamente todas nuestras carpetas, pero creamos un problema masivo: cada vez que ejecutamos el flujo, todos los archivos se vuelven a procesar y se insertan como duplicados en nuestra base de datos vectorial.
En esta lección, implementaremos una lógica de sincronización inteligente. Nuestro objetivo es que el flujo solo procese archivos que sean nuevos o que hayan sido modificados desde la última vez que se ejecutó.
Parte 1: El Problema y la Estrategia
Si ejecutas el flujo actual, verás que procesa los 20 archivos. Si lo ejecutas de nuevo, vuelve a procesar los mismos 20 archivos. Esto es ineficiente y corrompe nuestros datos.
Nuestra estrategia se basará en dos pilares:
- Una nueva tabla de «memoria»: Crearemos una tabla en Postgres llamada
Ingested_files(Archivos Ingeridos) que actuará como un registro. Guardará elFile_idy, lo más importante, elmd5Checksumde cada archivo que procesemos. - Lógica condicional: Antes de procesar un archivo, consultaremos esta tabla:
- «¿Ya existe un archivo con este
File_idY estemd5Checksum?» - Si la respuesta es SÍ: El archivo existe y no ha cambiado. Lo ignoramos.
- Si la respuesta es NO: Es un archivo nuevo o es un archivo existente que ha cambiado (su
md5Checksumes diferente). En este caso, debemos:- Borrar todos los vectores antiguos asociados con ese
File_id. - Procesar el nuevo archivo e insertar los nuevos vectores.
- Actualizar (o insertar) el registro en nuestra tabla
Ingested_filescon el nuevomd5Checksum.
- Borrar todos los vectores antiguos asociados con ese
- «¿Ya existe un archivo con este
Configuración del Espacio de Trabajo
Para desarrollar esta lógica sin esperar a que se procesen todos los archivos cada vez, vamos a desconectar temporalmente la recursividad.
- En tu flujo, ve al primer nodo IF (el que separa archivos de carpetas).
- Desconecta la salida «false» (la que iba al segundo nodo «Search»).
- Asegúrate de tener al menos un archivo PDF en la carpeta raíz de Google Drive.
De esta manera, nuestro flujo solo procesará un archivo, permitiéndonos construir y probar la lógica de forma rápida.
Parte 2: Preparación de la Base de Datos
Necesitamos modificar nuestra base de datos para que pueda almacenar la información de seguimiento.
Paso 2.1: Crear la Tabla de Seguimiento ingested_files
En tu editor de SQL (como pgAdmin o el editor de Neon), ejecuta la siguiente consulta para crear nuestra tabla de nombre ingested-files para recordar los archivos que ya hemos insertado en nuestra base de datos vectorial.
CREATE TABLE IF NOT EXISTS ingested_files (
file_id text PRIMARY KEY, -- Google Drive file id
name text,
mime_type text,
md5_checksum text,
modified_time timestamptz,
last_ingested timestamptz DEFAULT now()
);
Esta tabla almacenará, entre otras cosas, el file_id único y la «huella digital» (md5_checksum) de cada archivo que procesemos.
Paso 2.2: Actualizar la Tabla de Vectores
Nuestra tabla n8n_vectors actual no sabe a qué archivo pertenece cada «chunk» (fragmento de texto). Vamos a añadirle una columna file_id.
En tu editor SQL, ejecuta:
ALTER TABLE n8n_vectors
ADD COLUMN "file_id" VARCHAR(255);
Nota: Si prefieres, puedes usar la interfaz gráfica de tu gestor de base de datos para añadir una nueva columna llamada file_id de tipo
textovarchar(255).
Paso 2.3: El Primer Filtro (¿Archivo ya procesado?)
De vuelta en n8n, justo después de la salida «true» del primer nodo IF (el que confirma «es un archivo»), añade un nodo PostgreSQL.
- Configúralo para «Execute Query» (Ejecutar Consulta).
- Renómbralo: «¿Archivo ya procesado?»
- Pega la siguiente consulta SQL:
select count(*) as ingested from public.ingested_files
where file_id = '{{ $json.id }}' and md5_checksum ='{{ $json.md5Checksum }}'
Esta consulta cuenta cuántos archivos existen con exactamente el mismo ID y el mismo Checksum.
- Si devuelve
0(en formato string:"0"), el archivo es nuevo o ha cambiado. - Si devuelve
1(o más), el archivo ya fue procesado y no ha cambiado.
Paso 2.4: El Nodo «IF» de Decisión
Añade un nodo IF después del nodo ¿Archivo ya procesado?.
- Renómbralo: «
¿Fue cargado anteriormente?« - Configura la condición para comprobar si el archivo SÍ fue cargado.
- Condición:
- Value 1:
{{ $json.ingested }} - Operation:
String->is not equal to - Value 2:
0
- Value 1:
De esta forma:
- Salida «true»: El conteo no es 0 (es 1 o más). Significa que el archivo ya existe y no ha cambiado.
- Salida «false»: El conteo es 0. Significa que debemos procesar el archivo.
Paso 2.5: Finalizar la Rama «true»
Conecta la salida «true» del nodo ¿Fue cargado anteriormente? a un nodo NoOp (Do Nothing). Renómbralo a «No hacer nada". Esta rama está terminada.
Paso 2.6: Aislar Variables (Nodo «Edit Fields»)
El resto de nuestra lógica se conectará a la salida «false».
- Conecta un nodo Edit Fields a la salida «false».
- Renómbralo exactamente así: «
Extraer variables«. Esto nos ahorrará un paso de configuración más adelante. - Configúralo para extraer solo los datos que necesitamos, usando las expresiones del nodo
IF "¿Es un archivo?:id:{{ $('¿Es un archivo?').item.json.id }}name:{{ $('¿Es un archivo?').item.json.name }}mimeType:{{ $('¿Es un archivo?').item.json.mimeType }}md5Checksum:{{ $('¿Es un archivo?').item.json.md5Checksum }}modifiedTime:{{ $('¿Es un archivo?').item.json.modifiedTime }}
Parte 3: Implementación de la Lógica
Ahora construiremos el flujo de procesamiento para los archivos nuevos o actualizados, conectándolos después del nodo "Extraer variables".
Paso 3.1: Borrar Referencias Viejas (DELETE)
Antes de insertar nuevos vectores, debemos borrar los viejos.
- Añade un nodo PostgreSQL después de «
Extraer variables" - Renómbralo: «
Borrar referencias viejas" - Configúralo para «Execute Query».
- Pega la siguiente consulta:
delete from n8n_vectors where file_id= '{{ $json.id }}'
- ¡MUY IMPORTANTE! Ve a la pestaña «Settings» (Configuración) del nodo y activa la opción «Always Output Data» (Siempre Generar Salida). Esto evita que el flujo se detenga si no encuentra ningún registro para borrar (por ejemplo, en el caso de un archivo nuevo).
Paso 3.2: El Truco del Nodo «Merge»
Necesitamos asegurarnos de que el borrado (DELETE) se complete antes de continuar, pero queremos que los datos de "Extraer variables" sigan fluyendo.
- Añade un nodo Merge.
- Renómbralo: «
Esperar eliminación". - Conecta «
Extraer variables"a la entrada Input 1. - Conecta «
Borrar referencias viejas"a la entrada Input 2. - Configuración del Nodo Merge:
- Mode: Choose Branch.
- Number of inputs: 2.
- Output Type: Wait for All Inputs to Arrive.
- Output:
Dataof Specified Input. - Use Data of Input: 1.
Esto fuerza a n8n a esperar a que ambas ramas terminen (incluyendo el borrado) pero solo deja pasar los datos limpios de "Extraer variables".
Paso 3.3: Insertar en la Tabla de Seguimiento
Ahora que los datos viejos están borrados, registramos el nuevo archivo en nuestra tabla de seguimiento.
- Añade un nodo PostgreSQL después del
Merge. - Renómbralo:
"Insertar documento« - Configura la operación para «Insert» (Insertar).
- Schema:
public - Table:
Ingested_files - Mapea las columnas usando los datos que vienen del nodo:
fileId:{{ $json.id }}fileName:{{ $json.name }}mimeType:{{ $json.mimeType }}md5Checksum:{{ $json.md5Checksum }}modifiedTime:{{ $json.modifiedTime }}
Paso 3.4: Conectar el Flujo de Vectorización
Conecta la salida de "Insertar documento" al flujo que ya teníamos: Descargar documento -> Postgres PGvector Store.
Paso 3.5: Actualizar los Nuevos Vectores (UPDATE)
El nodo PGVectorStore crea los nuevos chunks de vectores, pero los crea con la columna File_id en null. Nuestro último paso es actualizar esos nuevos chunks con el File_id correcto.
- Añade un nodo PostgreSQL después del nodo Postgres
PGVector Store. - Renómbralo: «
Actualizar File _id en vectores«. - Configúralo para «Execute Query».
- Pega la siguiente consulta (esta usa el nombre de tu nodo
Extraer Variables):
UPDATE n8n_vectors
SET file_id = '{{ $('Extraer variables').item.json.id }}'
WHERE metadata->'loc'->'lines' = '{{ $json.metadata.loc.lines.toJsonString() }}' and file_id is null
Esta consulta es avanzada: actualiza el File_id solo en los registros donde el File_id es nulo y la ubicación (location) del documento coincide con la que acabamos de insertar.
- Conecta la salida de este nodo a un nodo NoOp y renómbralo
Termina Proceso.
Parte 4: La Prueba de Fuego
¡Es hora de probar todo el sistema!
- Revisar Nodo Recursivo: Ve al segundo nodo «Search» (el que estaba en la rama «false» de la recursividad) y asegúrate de que en «Additional Fields» también tengas marcado el asterisco (
*). Esto es vital para que los archivos en subcarpetas también traigan sumd5Checksum. - Purgar las Tablas: Ve a tu editor SQL y vacía ambas tablas para empezar de cero:
TRUNCATE TABLE "Ingested_files"; TRUNCATE TABLE n8n_vectors;
- Reconectar Recursividad: Vuelve al primer nodo IF (Es un archivo) y reconecta la salida «false» al segundo nodo «Search».
- ¡Ejecuta el Flujo!
Resultados Esperados:
- Primera Ejecución: El flujo tardará un tiempo. Procesará todos los archivos de la raíz y de todas las subcarpetas. Si revisas tu base de datos,
Ingested_filestendrá 20+ registros yn8n_vectorstendrá todos los chunks con susFile_idcorrespondientes. - Segunda Ejecución: ¡El flujo debería terminar casi instantáneamente! Cada archivo pasará por «
¿Fue cargado anteriormente?«, elIFdeterminará queingestedes"1", y el flujo se desviará a «No hacer nada".
¡Felicidades! Has construido un sistema RAG robusto que se sincroniza con Google Drive