🖥️ Estudio: Workflow Executor Local (Electron App)
📋 Resumen Ejecutivo
Propuesta: Crear una aplicación Electron similar a ml-trainer-local y whatsapp-local-agent para ejecutar workflows de forma local, sin dependencia del backend de Cloudflare Workers.
Beneficios Clave:
- ✅ Ejecución local de workflows (sin latencia de red)
- ✅ IA completamente offline (integración con ML Trainer Local)
- ✅ Control total desde dispositivo móvil (via API local)
- ✅ Cero costes de Workers/D1 para ejecución
- ✅ Privacidad total (datos no salen del dispositivo)
🎯 Casos de Uso Principales
1. Home Automation + IA Local
Usuario en viaje → Lanza workflow desde móvil
↓
PC en casa ejecuta workflow localmente
↓
- Consulta ML Trainer Local (embeddings, clasificación)
- Ejecuta scripts Python/Node locales
- Controla dispositivos IoT (IoT Hub)
- Envía WhatsApp (WhatsApp Local Agent)
↓
Respuesta al móvil vía WebSocket
2. Data Processing Sin Internet
Workflow de procesamiento de datos
→ Lectura de archivos locales
→ Procesamiento con IA local (ML Trainer)
→ Generación de reportes
→ Guardado local + sincronización diferida
3. Testing y Desarrollo
Desarrollador → Prueba workflows sin deployar a producción
→ Debugging local con breakpoints
→ Iteración rápida sin límites de Workers
🏗️ Arquitectura Propuesta
Estructura del Proyecto
workflow-executor-local/
├── package.json
├── README.md
├── EXECUTOR_README.md
├── .env.example
├── electron/
│ ├── main.js # Main process (Electron)
│ ├── preload.js # IPC bridge
│ └── tray.js # System tray
├── src/
│ ├── server.js # HTTP server (Fastify)
│ ├── executor.js # WorkflowExecutor adaptado
│ ├── database.js # SQLite local
│ ├── scheduler.js # Cron jobs locales
│ └── services/
│ ├── mlTrainer.js # Integración ML Trainer
│ ├── whatsappAgent.js # Integración WhatsApp
│ ├── iotHub.js # Integración IoT Hub
│ └── fileSystem.js # Operaciones de archivos
├── ui/
│ ├── index.html # Dashboard Electron
│ ├── app.js # UI logic
│ └── styles.css # Estilos
└── assets/
└── icon.png # Icono de la app
🔧 Componentes Técnicos
1. WorkflowExecutor Local
Adaptación del backend:
// src/executor.js
import { WorkflowExecutor as BaseExecutor } from '../../../functions/api/workflows/WorkflowExecutor.js';
export class LocalWorkflowExecutor extends BaseExecutor {
constructor(execution, db, services) {
// Adaptar para usar SQLite local en lugar de D1
super(execution, db, {
AI: services.mlTrainer, // ML Trainer Local como AI provider
WHATSAPP: services.whatsappAgent,
IOT_HUB: services.iotHub,
// ... otros servicios locales
});
}
// Override para nodos que requieren adaptación
async executeAIAgentNode(node) {
// Usar ML Trainer Local en lugar de Gemini/OpenAI
const response = await this.env.AI.infer({
model: node.config.model || 'default',
prompt: this.resolveVariables(node.config.prompt),
maxTokens: node.config.maxTokens || 500
});
return {
success: true,
output: { response }
};
}
async executeFileSystemNode(node) {
// NUEVO: Operaciones de sistema de archivos
const fs = require('fs').promises;
const operation = node.config.operation;
switch (operation) {
case 'read':
const content = await fs.readFile(node.config.path, 'utf8');
return { success: true, output: { content } };
case 'write':
await fs.writeFile(node.config.path, node.config.content);
return { success: true, output: {} };
case 'list':
const files = await fs.readdir(node.config.path);
return { success: true, output: { files } };
}
}
async executeScriptNode(node) {
// NUEVO: Ejecutar scripts Python/Node/Bash locales
const { exec } = require('child_process');
const { promisify } = require('util');
const execAsync = promisify(exec);
const { stdout, stderr } = await execAsync(node.config.command);
return {
success: !stderr,
output: { stdout, stderr }
};
}
}
2. HTTP Server (API Local)
// src/server.js
import Fastify from 'fastify';
import cors from '@fastify/cors';
import ws from '@fastify/websocket';
const fastify = Fastify({ logger: true });
await fastify.register(cors, { origin: '*' });
await fastify.register(ws);
// ═══════════════════════════════════════════════════════════════
// API REST
// ═══════════════════════════════════════════════════════════════
// GET /api/status
fastify.get('/api/status', async (request, reply) => {
return {
status: 'running',
uptime: process.uptime(),
executions: await db.getActiveExecutions(),
services: {
mlTrainer: await mlTrainerService.isAvailable(),
whatsappAgent: await whatsappAgentService.isAvailable(),
iotHub: await iotHubService.isAvailable()
}
};
});
// POST /api/workflows/execute
fastify.post('/api/workflows/execute', async (request, reply) => {
const { workflowId, workflowDefinition, trigger, triggerData } = request.body;
// Crear ejecución en DB local
const executionId = await db.createExecution({
workflowId,
workflowDefinition,
trigger,
triggerData,
status: 'pending'
});
// Ejecutar workflow de forma asíncrona
executeWorkflowAsync(executionId).catch(err => {
console.error('Workflow execution error:', err);
});
return {
success: true,
executionId
};
});
// GET /api/workflows/:executionId
fastify.get('/api/workflows/:executionId', async (request, reply) => {
const execution = await db.getExecution(request.params.executionId);
return execution;
});
// GET /api/workflows (list all executions)
fastify.get('/api/workflows', async (request, reply) => {
const executions = await db.getAllExecutions({
limit: request.query.limit || 50,
status: request.query.status
});
return { executions };
});
// DELETE /api/workflows/:executionId
fastify.delete('/api/workflows/:executionId', async (request, reply) => {
await db.deleteExecution(request.params.executionId);
return { success: true };
});
// ═══════════════════════════════════════════════════════════════
// WebSocket (Real-time updates)
// ═══════════════════════════════════════════════════════════════
fastify.register(async function (fastify) {
fastify.get('/ws', { websocket: true }, (connection, req) => {
connection.socket.on('message', message => {
const data = JSON.parse(message);
if (data.type === 'subscribe') {
// Suscribirse a updates de una ejecución
subscribeToExecution(data.executionId, connection);
}
});
});
});
// Iniciar servidor
await fastify.listen({ port: 8766, host: '0.0.0.0' });
console.log('🚀 Workflow Executor Local running on http://localhost:8766');
3. Base de Datos Local (SQLite)
// src/database.js
import Database from 'better-sqlite3';
import path from 'path';
import { app } from 'electron';
const dbPath = path.join(app.getPath('userData'), 'workflow-executor.db');
const db = new Database(dbPath);
// Inicializar schema
db.exec(`
CREATE TABLE IF NOT EXISTS workflow_executions (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
workflow_name TEXT,
workflow_definition TEXT NOT NULL,
trigger TEXT NOT NULL,
trigger_data TEXT,
status TEXT DEFAULT 'pending',
current_node_id TEXT,
context TEXT,
results TEXT,
logs TEXT,
error TEXT,
created_at TEXT NOT NULL,
started_at TEXT,
completed_at TEXT
);
CREATE TABLE IF NOT EXISTS scheduled_workflows (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
workflow_definition TEXT NOT NULL,
schedule TEXT NOT NULL, -- cron expression
enabled INTEGER DEFAULT 1,
last_run TEXT,
next_run TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_executions_status ON workflow_executions(status);
CREATE INDEX IF NOT EXISTS idx_executions_created ON workflow_executions(created_at DESC);
`);
export default {
createExecution: (data) => { /* ... */ },
getExecution: (id) => { /* ... */ },
updateExecution: (id, updates) => { /* ... */ },
getAllExecutions: (filters) => { /* ... */ },
deleteExecution: (id) => { /* ... */ },
// Scheduled workflows
createSchedule: (data) => { /* ... */ },
getSchedules: () => { /* ... */ },
updateSchedule: (id, updates) => { /* ... */ },
deleteSchedule: (id) => { /* ... */ }
};
4. Integración con Servicios Locales
ML Trainer Local
// src/services/mlTrainer.js
import axios from 'axios';
const ML_TRAINER_URL = 'http://localhost:8765';
export default {
async isAvailable() {
try {
const response = await axios.get(`${ML_TRAINER_URL}/health`);
return response.status === 200;
} catch {
return false;
}
},
async infer({ model, prompt, maxTokens }) {
const response = await axios.post(`${ML_TRAINER_URL}/v1/completions`, {
model: model || 'default',
prompt,
max_tokens: maxTokens || 500
});
return response.data.choices[0].text;
},
async embed(text) {
const response = await axios.post(`${ML_TRAINER_URL}/api/embeddings`, {
text
});
return response.data.embedding;
},
async classify(text, labels) {
const response = await axios.post(`${ML_TRAINER_URL}/api/classify`, {
text,
labels
});
return response.data.prediction;
}
};
WhatsApp Local Agent
// src/services/whatsappAgent.js
import axios from 'axios';
const WHATSAPP_URL = 'http://localhost:3600';
export default {
async isAvailable() {
try {
const response = await axios.get(`${WHATSAPP_URL}/api/health`);
return response.status === 200;
} catch {
return false;
}
},
async sendMessage({ phone, message }) {
const response = await axios.post(`${WHATSAPP_URL}/api/messages/send`, {
phone,
message
});
return response.data;
},
async sendImage({ phone, imageUrl, caption }) {
const response = await axios.post(`${WHATSAPP_URL}/api/messages/image`, {
phone,
imageUrl,
caption
});
return response.data;
}
};
IoT Hub
// src/services/iotHub.js
import axios from 'axios';
const IOT_HUB_URL = 'http://localhost:8080';
export default {
async isAvailable() {
try {
const response = await axios.get(`${IOT_HUB_URL}/api/health`);
return response.status === 200;
} catch {
return false;
}
},
async publishMQTT({ topic, payload }) {
const response = await axios.post(`${IOT_HUB_URL}/api/mqtt/publish`, {
topic,
payload
});
return response.data;
},
async getDeviceStatus(deviceId) {
const response = await axios.get(`${IOT_HUB_URL}/api/devices/${deviceId}`);
return response.data;
}
};
5. Scheduler (Cron Jobs)
// src/scheduler.js
import cron from 'node-cron';
import db from './database.js';
import { executeWorkflow } from './executor.js';
const activeTasks = new Map();
export function initScheduler() {
// Cargar workflows programados al iniciar
const schedules = db.getSchedules();
schedules.forEach(schedule => {
if (schedule.enabled) {
registerSchedule(schedule);
}
});
console.log(`📅 Scheduler initialized with ${schedules.length} schedules`);
}
export function registerSchedule(schedule) {
if (activeTasks.has(schedule.id)) {
activeTasks.get(schedule.id).stop();
}
const task = cron.schedule(schedule.schedule, async () => {
console.log(`⏰ Executing scheduled workflow: ${schedule.workflow_id}`);
try {
await executeWorkflow({
workflowId: schedule.workflow_id,
workflowDefinition: JSON.parse(schedule.workflow_definition),
trigger: 'schedule',
triggerData: {
scheduleId: schedule.id,
scheduledAt: new Date().toISOString()
}
});
db.updateSchedule(schedule.id, {
last_run: new Date().toISOString()
});
} catch (error) {
console.error('Scheduled workflow error:', error);
}
});
activeTasks.set(schedule.id, task);
}
export function unregisterSchedule(scheduleId) {
if (activeTasks.has(scheduleId)) {
activeTasks.get(scheduleId).stop();
activeTasks.delete(scheduleId);
}
}
🖥️ UI Dashboard (Electron)
<!-- ui/index.html -->
<!DOCTYPE html>
<html>
<head>
<title>Workflow Executor Local</title>
<link rel="stylesheet" href="styles.css">
</head>
<body class="dark">
<div class="app-container">
<!-- Sidebar -->
<nav class="sidebar">
<div class="logo">
<img src="../assets/icon.png" alt="Logo">
<span>Workflow Executor</span>
</div>
<ul class="nav-list">
<li data-tab="dashboard" class="active">
<span>📊</span> Dashboard
</li>
<li data-tab="executions">
<span>▶️</span> Executions
</li>
<li data-tab="schedules">
<span>⏰</span> Schedules
</li>
<li data-tab="services">
<span>🔌</span> Services
</li>
<li data-tab="logs">
<span>📋</span> Logs
</li>
<li data-tab="settings">
<span>⚙️</span> Settings
</li>
</ul>
</nav>
<!-- Main Content -->
<main class="main-content">
<!-- Dashboard Tab -->
<div id="tab-dashboard" class="tab-content active">
<h1>Workflow Executor Dashboard</h1>
<div class="stats-grid">
<div class="stat-card">
<div class="stat-icon">⚡</div>
<div class="stat-info">
<h3>Active Executions</h3>
<div class="stat-value" id="active-count">0</div>
</div>
</div>
<div class="stat-card">
<div class="stat-icon">✅</div>
<div class="stat-info">
<h3>Completed Today</h3>
<div class="stat-value" id="completed-today">0</div>
</div>
</div>
<div class="stat-card">
<div class="stat-icon">⏰</div>
<div class="stat-info">
<h3>Scheduled</h3>
<div class="stat-value" id="scheduled-count">0</div>
</div>
</div>
</div>
<div class="services-status">
<h2>Connected Services</h2>
<div class="service-list">
<div class="service-item">
<span class="service-icon">🤖</span>
<span>ML Trainer Local</span>
<span id="ml-trainer-status" class="status-badge offline">Offline</span>
</div>
<div class="service-item">
<span class="service-icon">💬</span>
<span>WhatsApp Agent</span>
<span id="whatsapp-status" class="status-badge offline">Offline</span>
</div>
<div class="service-item">
<span class="service-icon">🏠</span>
<span>IoT Hub</span>
<span id="iot-status" class="status-badge offline">Offline</span>
</div>
</div>
</div>
</div>
<!-- Executions Tab -->
<div id="tab-executions" class="tab-content">
<div class="tab-header">
<h1>Workflow Executions</h1>
<button class="btn btn-primary" onclick="openExecuteModal()">
▶️ Execute Workflow
</button>
</div>
<div class="executions-list" id="executions-list">
<!-- Populated by JavaScript -->
</div>
</div>
<!-- Schedules Tab -->
<div id="tab-schedules" class="tab-content">
<div class="tab-header">
<h1>Scheduled Workflows</h1>
<button class="btn btn-primary" onclick="openScheduleModal()">
⏰ New Schedule
</button>
</div>
<div class="schedules-list" id="schedules-list">
<!-- Populated by JavaScript -->
</div>
</div>
</main>
</div>
<script src="app.js"></script>
</body>
</html>
📱 Control desde Móvil
API Endpoints para Móvil
// Desde el móvil (en la misma red local)
const API_URL = 'http://192.168.1.100:8766'; // IP del PC en casa
// Ejecutar workflow
const response = await fetch(`${API_URL}/api/workflows/execute`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
workflowId: 'workflow_123',
workflowDefinition: { /* ... */ },
trigger: 'manual',
triggerData: {
source: 'mobile',
user: 'gonzalo',
location: 'madrid'
}
})
});
// Monitorear ejecución vía WebSocket
const ws = new WebSocket(`ws://${API_URL}/ws`);
ws.onopen = () => {
ws.send(JSON.stringify({
type: 'subscribe',
executionId: response.executionId
}));
};
ws.onmessage = (event) => {
const update = JSON.parse(event.data);
console.log('Workflow update:', update);
};
Túnel Seguro (Opcional)
# Con ngrok (para acceso desde fuera de la red local)
ngrok http 8766
# Ahora accesible desde cualquier lugar:
# https://abc123.ngrok.io/api/workflows/execute
💰 Comparativa de Costes
| Característica | Backend (Workers) | Local Executor |
|---|---|---|
| Ejecución de workflow | $0.50/millón req | ❌ Gratis |
| AI Inference | $0.45/1K tokens | ❌ Gratis (ML Trainer) |
| Base de datos (D1) | $0.75/millón reads | ❌ Gratis (SQLite) |
| Almacenamiento | $0.015/GB-mes | ❌ Gratis (disco local) |
| Ancho de banda | $0.09/GB | ❌ Gratis (red local) |
| Scheduled workflows | $0.30/ejecución | ❌ Gratis (cron local) |
Ahorro estimado: ~$50-200/mes en workflows intensivos
🎯 Roadmap de Implementación
Fase 1: MVP (1 semana)
- Estructura básica del proyecto
- WorkflowExecutor adaptado (sin AI)
- HTTP server + REST API
- SQLite database
- UI dashboard básico
Fase 2: Integraciones (1 semana)
- Integración ML Trainer Local
- Integración WhatsApp Agent
- Integración IoT Hub
- WebSocket para updates en tiempo real
Fase 3: Features Avanzadas (1 semana)
- Scheduler (cron jobs)
- File system operations
- Script execution (Python/Node/Bash)
- Logs persistentes
Fase 4: Polish (3 días)
- UI mejorada
- Instaladores (Windows/Mac/Linux)
- Documentación completa
- Tests E2E
⚠️ Limitaciones y Consideraciones
Limitaciones
Nodos No Soportados:
- Twilio Voice (requiere internet)
- ElevenLabs Voice (requiere internet)
- Email (requiere SMTP)
- Webhooks salientes (requiere internet)
Dependencias de Red:
- Google Drive (requiere OAuth)
- APIs externas
Seguridad:
- API local sin autenticación (solo red local)
- Túneles ngrok exponen el servicio
Soluciones
Nodos Híbridos:
- Detectar si internet está disponible
- Fallback a alternativas locales
Queue de Sincronización:
- Encolar operaciones que requieren internet
- Ejecutar cuando conexión esté disponible
Autenticación:
- API key local
- JWT para acceso remoto
- HTTPS con certificado autofirmado
🚀 Conclusión
RECOMENDACIÓN: ✅ IMPLEMENTAR
Beneficios:
- 🎯 Caso de uso real y valioso (control remoto del PC)
- 💰 Ahorro significativo en costes de Workers
- 🔒 Privacidad total (IA offline)
- ⚡ Latencia cero (ejecución local)
- 🏠 Ecosistema completo (ML + WhatsApp + IoT)
Siguiente Paso:
Crear repositorio workflow-executor-local/ y empezar con Fase 1 (MVP).