Federated Learning Architecture

Overview

BitFlow implements a federated learning pattern where workflows execute in parallel across multiple datacenters, enabling computation to move to data rather than data moving to computation. This architecture ensures data sovereignty while enabling distributed AI training and analysis.

Core Concepts

Federated Execution Model

┌─────────────────────────────────────────────────────────────────┐
│                    FEDERATED WORKFLOW                           │
│  User submits single workflow → Executes across multiple DCs    │
│                                                                 │
│  ┌───────────────┐    ┌───────────────┐    ┌───────────────┐  │
│  │   US-EAST-1   │    │   EU-WEST-1   │    │  AP-SOUTH-1   │  │
│  │               │    │               │    │               │  │
│  │ Dataset: US   │    │ Dataset: EU   │    │ Dataset: AP   │  │
│  │ Workflow Run  │    │ Workflow Run  │    │ Workflow Run  │  │
│  │ Tasks: A,B,C  │    │ Tasks: A,B,C  │    │ Tasks: A,B,C  │  │
│  └───────────────┘    └───────────────┘    └───────────────┘  │
│         │                      │                      │        │
│         └──────────────────────┼──────────────────────┘        │
│                                │                               │
│                    ┌───────────▼───────────┐                   │
│                    │   RESULT AGGREGATION  │                   │
│                    │   • Average weights   │                   │
│                    │   • Combine metrics   │                   │
│                    │   • Generate report   │                   │
│                    └───────────────────────┘                   │
└─────────────────────────────────────────────────────────────────┘

Map-Reduce Pattern

The federated learning implementation follows a map-reduce pattern:

  1. Map Phase: Distribute workflow execution to all datacenters containing required datasets
  2. Reduce Phase: Aggregate results from all datacenter runs into a single federated result

Architecture Components

1. Workflow Definition with Federated Configuration

type Workflow struct {
    // Standard fields...
    DatasetMounts   []DatasetMount   `json:"dataset_mounts" gorm:"type:jsonb"`
    FederatedConfig FederatedConfig  `json:"federated_config" gorm:"type:jsonb"`
}

type DatasetMount struct {
    DatasetID   string `json:"dataset_id"`
    MountPath   string `json:"mount_path"`
    Permissions string `json:"permissions,omitempty"`
    Required    bool   `json:"required,omitempty"`
}

type FederatedConfig struct {
    AggregationStrategy string `json:"aggregation_strategy,omitempty"`  // "average", "weighted", "first"
    MinDatacenters      int    `json:"min_datacenters,omitempty"`       // Minimum DCs required
    AllowPartialFailure bool   `json:"allow_partial_failure,omitempty"` // Continue if some DCs fail
}

2. Multi-Datacenter Run Creation

When a workflow with dataset mounts is executed:

func (h *WorkflowHandler) createFederatedRuns(
    workflow models.Workflow,
    workflowDef models.WorkflowDef,
    req RunWorkflowRequest,
    userID, runName, runDescription string,
) ([]models.Run, error) {
    // 1. Determine datacenters from datasets
    datacenterMounts := determineDatacentersFromDatasets(allMounts)

    // 2. Validate minimum datacenter requirement
    if len(datacenterMounts) < workflowDef.FederatedConfig.MinDatacenters {
        return nil, fmt.Errorf("workflow requires at least %d datacenters, but only %d available",
            workflowDef.FederatedConfig.MinDatacenters, len(datacenterMounts))
    }

    // 3. Create run for each datacenter
    var runs []models.Run
    for datacenterID, mounts := range datacenterMounts {
        run := models.Run{
            ID:           generateRunID(),
            UserID:       userID,
            WorkflowID:   workflow.ID,
            DatacenterID: datacenterID,
            Name:         runName,
            Description:  runDescription,
            Status:       models.RunStatusPending,
            Mounts:       mounts,
            // ... other fields
        }
        runs = append(runs, run)
    }

    return runs, nil
}

3. Federated Task Distribution

Each datacenter run generates workflow tasks that are published to datacenter-specific queues:

Workflow Execution:
├─ workflow.tasks.dc-us-east-1  ← Tasks for US datacenter
├─ workflow.tasks.dc-eu-west-1  ← Tasks for EU datacenter
└─ workflow.tasks.dc-ap-south-1 ← Tasks for AP datacenter

Dead Letter Queues:
├─ workflow.tasks.dc-us-east-1.dlq
├─ workflow.tasks.dc-eu-west-1.dlq
└─ workflow.tasks.dc-ap-south-1.dlq

4. Result Aggregation

The FederatedAggregator combines results from multiple datacenter runs:

type FederatedAggregator struct {
    db *gorm.DB
}

func (fa *FederatedAggregator) AggregateResults(workflowID string, runs []models.Run) (*FederatedResult, error) {
    completedRuns := filterCompletedRuns(runs)

    switch strategy {
    case "average":
        return fa.averageAggregation(completedRuns)
    case "weighted":
        return fa.weightedAggregation(completedRuns)
    case "first":
        return fa.firstWinsAggregation(completedRuns)
    default:
        return fa.averageAggregation(completedRuns)
    }
}

Data Flow Example

Medical AI Training Across Regions

# Workflow: Global Medical AI Training
workflow:
  name: "federated-cancer-detection"
  dataset_mounts:
    - dataset_id: "us-hospital-data" # Located in dc-us-east-1
      mount_path: "/data/training"
      permissions: "read"
    - dataset_id: "eu-hospital-data" # Located in dc-eu-west-1
      mount_path: "/data/training"
      permissions: "read"
    - dataset_id: "asia-hospital-data" # Located in dc-ap-south-1
      mount_path: "/data/training"
      permissions: "read"

  federated_config:
    aggregation_strategy: "weighted" # Weight by dataset size
    min_datacenters: 2 # Require at least 2 regions
    allow_partial_failure: true # Continue if one region fails

Execution Flow

  1. User submits workflow: Single API call to /workflows/{id}/run

  2. System creates federated runs:

    {
      "is_federated": true,
      "runs": [
        {
          "run_id": "run_us_001",
          "datacenter": "dc-us-east-1",
          "status": "running"
        },
        {
          "run_id": "run_eu_001",
          "datacenter": "dc-eu-west-1",
          "status": "running"
        },
        {
          "run_id": "run_ap_001",
          "datacenter": "dc-ap-south-1",
          "status": "running"
        }
      ]
    }
  3. Tasks execute in parallel:

    • US datacenter: Trains on local hospital data (compliance: HIPAA)
    • EU datacenter: Trains on local hospital data (compliance: GDPR)
    • AP datacenter: Trains on local hospital data (compliance: local regulations)
  4. Results aggregated:

    {
      "aggregation_strategy": "weighted",
      "participating_runs": 3,
      "results": {
        "model_accuracy": 0.892, // Weighted average
        "total_samples": 150000, // Sum across regions
        "training_time": 3600.5, // Average
        "model_weights": "/federated/model.pth"
      },
      "datacenter_details": [
        { "datacenter": "dc-us-east-1", "samples": 75000, "accuracy": 0.895 },
        { "datacenter": "dc-eu-west-1", "samples": 45000, "accuracy": 0.887 },
        { "datacenter": "dc-ap-south-1", "samples": 30000, "accuracy": 0.891 }
      ]
    }

Benefits

1. Data Sovereignty

  • No data movement: Data never leaves its original datacenter
  • Regulatory compliance: Each region maintains local compliance (HIPAA, GDPR, etc.)
  • Privacy preservation: Only model updates/aggregated results are shared

2. Scalability

  • Parallel execution: All datacenters run simultaneously
  • Resource utilization: Leverages compute capacity across all regions
  • Fault tolerance: Can continue with partial failures if configured

3. Global AI Training

  • Larger datasets: Combines data from multiple regions without centralization
  • Diverse populations: Trains on varied demographics and conditions
  • Reduced bias: More representative models through geographic diversity

Implementation Details

Database Schema Support

The federated architecture leverages existing JSONB fields in PostgreSQL:

-- Workflows table supports federated configuration
ALTER TABLE workflows ADD COLUMN dataset_mounts JSONB DEFAULT '[]';
ALTER TABLE workflows ADD COLUMN federated_config JSONB DEFAULT '{}';

-- Runs table already supports datacenter-specific execution
-- workflow_tasks table handles task distribution per datacenter

Message Queue Architecture

RabbitMQ Exchange: bitflow
├─ workflow.tasks.dc-us-east-1     # US East tasks
├─ workflow.tasks.dc-eu-west-1     # EU West tasks
├─ workflow.tasks.dc-ap-south-1    # Asia Pacific tasks
├─ workflow.results                # Aggregated results
├─ workflow.progress               # Progress updates
└─ workflow.logs                   # Execution logs

Dead Letter Exchanges: bitflow.dlx
├─ workflow.tasks.dc-us-east-1.dlq
├─ workflow.tasks.dc-eu-west-1.dlq
└─ workflow.tasks.dc-ap-south-1.dlq

CLI Support

The CLI displays federated execution status:

$ bitflow workflows run federated-cancer-detection --mounts us-data:/data,eu-data:/data

 Federated workflow started
🌍 Executing across 3 datacenters:
 dc-us-east-1: Training on 75K samples
 dc-eu-west-1: Training on 45K samples
 dc-ap-south-1: Training on 30K samples

 Progress: [██████████] 100% (3/3 datacenters completed)
📊 Results aggregated using weighted strategy

📈 Final Results:
 Model Accuracy: 89.2%
 Total Samples: 150,000
 Training Time: 3600.5s
 Model Weights: /output/federated_model.pth

Security Considerations

1. Authentication & Authorization

  • Each datacenter validates user permissions for local datasets
  • JWT tokens used for cross-datacenter communication
  • Role-based access control (RBAC) enforced per region

2. Network Security

  • TLS encryption for all inter-datacenter communication
  • VPN/private network connections between regions
  • Certificate-based authentication for RabbitMQ queues

3. Data Protection

  • No raw data transmission between datacenters
  • Only encrypted model updates and aggregated metrics shared
  • Differential privacy can be applied to model updates

4. Audit & Compliance

  • Complete audit trail of federated executions
  • Per-datacenter compliance reporting
  • Data residency guarantees maintained

Configuration Examples

Simple Federated Training

workflow_definition:
  dataset_mounts:
    - dataset_id: "medical-us"
      mount_path: "/data"
      permissions: "read"
    - dataset_id: "medical-eu"
      mount_path: "/data"
      permissions: "read"

  federated_config:
    aggregation_strategy: "average"
    min_datacenters: 1
    allow_partial_failure: true

High-Availability Federated Analysis

workflow_definition:
  dataset_mounts:
    - dataset_id: "pharma-trial-us"
      mount_path: "/clinical_data"
      permissions: "read"
      required: true
    - dataset_id: "pharma-trial-eu"
      mount_path: "/clinical_data"
      permissions: "read"
      required: true
    - dataset_id: "pharma-trial-asia"
      mount_path: "/clinical_data"
      permissions: "read"
      required: false

  federated_config:
    aggregation_strategy: "weighted"
    min_datacenters: 2
    allow_partial_failure: false # All required DCs must succeed

API Endpoints

Create Federated Workflow Run

POST /api/v1/workflows/{workflow_id}/run
Content-Type: application/json

{
  "name": "Global Cancer Detection Training",
  "description": "Federated learning across US, EU, and Asia",
  "mounts": [
    {"dataset_id": "us-oncology-data", "mount_path": "/data/training"},
    {"dataset_id": "eu-oncology-data", "mount_path": "/data/training"},
    {"dataset_id": "asia-oncology-data", "mount_path": "/data/training"}
  ],
  "parameters": {
    "epochs": 100,
    "batch_size": 32,
    "learning_rate": 0.001
  }
}

Response

{
  "success": true,
  "data": {
    "is_federated": true,
    "runs": [
      {
        "run_id": "run_us_12345",
        "datacenter": "dc-us-east-1",
        "status": "pending",
        "datasets": ["us-oncology-data"],
        "estimated_duration": "2 hours"
      },
      {
        "run_id": "run_eu_12346",
        "datacenter": "dc-eu-west-1",
        "status": "pending",
        "datasets": ["eu-oncology-data"],
        "estimated_duration": "2 hours"
      },
      {
        "run_id": "run_asia_12347",
        "datacenter": "dc-ap-south-1",
        "status": "pending",
        "datasets": ["asia-oncology-data"],
        "estimated_duration": "2 hours"
      }
    ],
    "aggregation_config": {
      "strategy": "weighted",
      "min_datacenters": 2,
      "allow_partial_failure": true
    }
  }
}

Monitoring and Observability

Federated Execution Metrics

  • Datacenter participation: Track which DCs are active
  • Cross-region latency: Monitor communication delays
  • Resource utilization: Per-datacenter compute usage
  • Data residency compliance: Audit data movement (should be zero)
  • Aggregation performance: Time to combine results

Health Checks

$ bitflow status federated

🌍 Federated Learning Status
├─ Active Datacenters: 3/3
  ├─ dc-us-east-1: Healthy (latency: 45ms)
  ├─ dc-eu-west-1: Healthy (latency: 120ms)
  └─ dc-ap-south-1: Healthy (latency: 200ms)
├─ Queue Health: All queues operational
├─ Active Federated Runs: 12
└─ Aggregation Service: Running

Future Enhancements

1. Advanced Aggregation Strategies

  • Secure aggregation: Cryptographically secure multi-party computation
  • Differential privacy: Built-in privacy preservation
  • Byzantine fault tolerance: Resilience against malicious participants

2. Dynamic Datacenter Selection

  • Performance-based routing: Select fastest available datacenters
  • Cost optimization: Route to lowest-cost regions
  • Compliance-aware selection: Auto-select compliant datacenters

3. Adaptive Scaling

  • Auto-scaling: Increase resources based on federated workload
  • Load balancing: Distribute work across available capacity
  • Predictive scheduling: Pre-position resources for planned federated runs

This federated learning architecture enables BitFlow to deliver global-scale AI training while maintaining strict data sovereignty and regulatory compliance requirements.