Taskpack Architecture

Overview

Taskpacks are the executable units that run on Bitedge nodes. They contain the actual code, dependencies, and configurations needed to process data locally while preserving privacy.

Taskpack Structure

taskpack/
├── manifest.yaml          # Package metadata and requirements
├── requirements.txt       # Python dependencies
├── Dockerfile            # Container definition (optional)
├── src/
│   ├── __init__.py
│   ├── task.py          # Main task entry point
│   ├── preprocessor.py   # Data preprocessing logic
│   ├── trainer.py       # Training logic
│   └── utils/
├── configs/
│   ├── default.yaml     # Default configuration
│   └── schema.json      # Configuration schema
├── tests/
│   └── test_task.py
└── README.md

Manifest Structure

# manifest.yaml
apiVersion: v1
kind: TaskPackage
metadata:
  name: fedavg-resnet50-pathology
  version: 1.0.0
  author: BitFlow Team
  description: Federated learning package for pathology image classification

spec:
  type: training # training | analysis | preprocessing | inference
  framework: pytorch # pytorch | tensorflow | custom

  requirements:
    python: ">=3.8"
    gpu: required # required | optional | none
    memory: 16GB
    storage: 50GB

  inputs:
    - name: images
      type: wsi # wsi | dicom | numpy | csv
      format: [".svs", ".tiff", ".ndpi"]
    - name: labels
      type: annotations
      format: [".json", ".xml"]

  outputs:
    - name: model_updates
      type: model_weights
      format: ".pth"
    - name: metrics
      type: metrics
      format: ".json"

  federated:
    protocol: fedavg # fedavg | fedprox | scaffold
    aggregation_compatible: true
    differential_privacy: optional

Taskpack Types

1. LLM Training Package

# manifest.yaml for LLM training
apiVersion: v1
kind: TaskPackage
metadata:
  name: pathology-llm-finetuning
  version: 1.0.0
  description: Fine-tune LLM on pathology reports and WSI descriptions

spec:
  type: training
  framework: transformers

  requirements:
    python: ">=3.9"
    gpu: required
    memory: 32GB
    gpu_memory: 24GB # For LLM

  inputs:
    - name: reports
      type: text
      format: [".txt", ".json"]
    - name: wsi_features
      type: embeddings
      format: ".npy"
    - name: qa_pairs
      type: json
      format: ".json"

  outputs:
    - name: model_adapter # LoRA/QLoRA adapters
      type: model_weights
      format: ".bin"
    - name: tokenizer
      type: tokenizer
      format: ".json"

  llm_config:
    base_model: "meta-llama/Llama-2-7b-hf"
    training_method: "lora" # lora | qlora | full
    lora_config:
      r: 16
      lora_alpha: 32
      target_modules: ["q_proj", "v_proj"]

2. Pharma Discovery Package

# manifest.yaml for pharma biomarker discovery
apiVersion: v1
kind: TaskPackage
metadata:
  name: biomarker-discovery-spatial
  version: 2.0.0
  description: Spatial biomarker discovery for drug response prediction

spec:
  type: analysis
  framework: custom

  requirements:
    python: ">=3.8"
    gpu: optional
    memory: 32GB

  inputs:
    - name: wsi_images
      type: wsi
      format: [".svs", ".tiff"]
    - name: clinical_data
      type: structured
      format: [".csv", ".parquet"]
    - name: treatment_response
      type: structured
      format: ".json"

  outputs:
    - name: biomarker_candidates
      type: analysis_results
      format: ".json"
    - name: spatial_features
      type: features
      format: ".h5"
    - name: statistical_report
      type: report
      format: [".pdf", ".html"]

  analysis_config:
    methods:
      - spatial_statistics
      - cell_neighborhood_analysis
      - molecular_correlation
    privacy_preserving: true
    statistical_significance: 0.05

Task Entry Point Interface

# src/task.py - Standard interface all packages must implement

from abc import ABC, abstractmethod
from typing import Dict, Any, Optional

class BitflowTask(ABC):
    """Base class for all BitFlow tasks"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.node_id = config.get('node_id')
        self.task_id = config.get('task_id')
        self.round = config.get('round', 0)

    @abstractmethod
    def validate_inputs(self, input_paths: Dict[str, str]) -> bool:
        """Validate input data meets requirements"""
        pass

    @abstractmethod
    def preprocess(self, input_paths: Dict[str, str]) -> Dict[str, Any]:
        """Preprocess data before main execution"""
        pass

    @abstractmethod
    def execute(self, preprocessed_data: Dict[str, Any]) -> Dict[str, Any]:
        """Main task execution"""
        pass

    @abstractmethod
    def postprocess(self, results: Dict[str, Any]) -> Dict[str, str]:
        """Postprocess results and prepare outputs"""
        pass

    def run(self, input_paths: Dict[str, str], output_dir: str) -> Dict[str, Any]:
        """Standard execution pipeline"""
        # Validate
        if not self.validate_inputs(input_paths):
            raise ValueError("Input validation failed")

        # Execute pipeline
        preprocessed = self.preprocess(input_paths)
        results = self.execute(preprocessed)
        outputs = self.postprocess(results)

        # Save outputs
        for name, path in outputs.items():
            self.save_output(path, output_dir, name)

        return {
            'status': 'success',
            'outputs': outputs,
            'metrics': results.get('metrics', {})
        }

Example Implementations

LLM Training Task

# src/llm_training_task.py
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import LoraConfig, get_peft_model, TaskType

class PathologyLLMTask(BitflowTask):
    def __init__(self, config):
        super().__init__(config)
        self.model = None
        self.tokenizer = None

    def preprocess(self, input_paths):
        # Load pathology reports and WSI descriptions
        reports = self.load_reports(input_paths['reports'])
        wsi_features = self.load_features(input_paths['wsi_features'])

        # Create training dataset
        dataset = self.create_dataset(reports, wsi_features)
        return {'dataset': dataset}

    def execute(self, preprocessed_data):
        # Initialize model with LoRA
        base_model = AutoModelForCausalLM.from_pretrained(
            self.config['base_model'],
            load_in_8bit=True,
            device_map="auto"
        )

        lora_config = LoraConfig(
            task_type=TaskType.CAUSAL_LM,
            r=self.config['lora_r'],
            lora_alpha=self.config['lora_alpha'],
            target_modules=self.config['target_modules']
        )

        model = get_peft_model(base_model, lora_config)

        # Train
        trainer = self.create_trainer(model, preprocessed_data['dataset'])
        trainer.train()

        return {
            'model': model,
            'metrics': trainer.state.log_history
        }

Pharma Discovery Task

# src/biomarker_discovery_task.py
import numpy as np
from scipy import stats
from skimage import measure
import pandas as pd

class BiomarkerDiscoveryTask(BitflowTask):
    def preprocess(self, input_paths):
        # Load WSI and extract patches
        wsi_data = self.load_wsi(input_paths['wsi_images'])
        clinical = pd.read_csv(input_paths['clinical_data'])

        # Extract spatial features
        features = self.extract_spatial_features(wsi_data)

        return {
            'features': features,
            'clinical': clinical,
            'wsi_metadata': wsi_data['metadata']
        }

    def execute(self, preprocessed_data):
        features = preprocessed_data['features']
        clinical = preprocessed_data['clinical']

        # Spatial analysis
        spatial_patterns = self.analyze_spatial_patterns(features)

        # Correlation with treatment response
        biomarkers = self.correlate_with_response(
            spatial_patterns,
            clinical['treatment_response']
        )

        # Statistical validation
        validated = self.statistical_validation(biomarkers,
                                              significance=0.05)

        return {
            'biomarker_candidates': validated,
            'spatial_features': spatial_patterns,
            'statistics': self.generate_statistics(validated)
        }

    def analyze_spatial_patterns(self, features):
        """Analyze spatial relationships between cells"""
        patterns = []

        for slide_features in features:
            # Cell neighborhood analysis
            neighborhoods = self.compute_neighborhoods(slide_features)

            # Spatial statistics (Ripley's K, Moran's I)
            spatial_stats = self.compute_spatial_stats(neighborhoods)

            patterns.append({
                'neighborhoods': neighborhoods,
                'spatial_stats': spatial_stats
            })

        return patterns

Package Distribution

1. Package Registry

# bitflow-registry.yaml
registry:
  url: https://registry.bitflow.ai

packages:
  - name: fedavg-resnet50-pathology
    versions:
      - version: 1.0.0
        url: s3://bitflow-packages/fedavg-resnet50-pathology-1.0.0.tar.gz
        checksum: sha256:abcd1234...

  - name: pathology-llm-finetuning
    versions:
      - version: 1.0.0
        url: s3://bitflow-packages/pathology-llm-finetuning-1.0.0.tar.gz
        requirements:
          base_model_download: true

  - name: biomarker-discovery-spatial
    versions:
      - version: 2.0.0
        url: s3://bitflow-packages/biomarker-discovery-spatial-2.0.0.tar.gz

2. Package Validation (Mission Authentication)

# Mission authentication on Bitedge before accepting
class MissionAuthenticator:
    def authenticate_mission(self, mission_package: str) -> bool:
        """Verify this mission package is legitimate and safe"""
        print("📨 Incoming mission package...")

        # Verify mission source (IMF headquarters)
        manifest = self.decode_mission_briefing(mission_package)
        if not self.verify_mission_authority(manifest):
            print("⚠️ Mission source unverified. Aborting.")
            return False

        # Check agent capabilities match mission requirements
        if not self.verify_agent_equipment(manifest['spec']['requirements']):
            print("⚠️ Agent lacks required equipment. Mission declined.")
            return False

        # Security clearance
        if not self.security_clearance(mission_package):
            print("🚫 Security breach detected. Mission compromised.")
            return False

        # Mission simulation (dry run)
        if not self.simulate_mission(mission_package):
            print("❌ Mission simulation failed. Too dangerous.")
            return False

        print("✅ Mission authenticated. Good luck, agent.")
        print("⏰ This package will self-destruct after execution...")
        return True

    def secure_cleanup(self, mission_id: str):
        """Mission complete. Eliminate all traces."""
        # Clear sensitive data from memory
        # Remove temporary files
        # Wipe execution logs
        print(f"💥 Mission {mission_id} data destroyed.")

UI Integration (Mission Control Interface)

Flow Configuration with Mission Package Selection

interface FlowConfig {
  // ... existing config
  taskPackage: {
    name: string;
    version: string;
    source: "registry" | "custom";
    customUrl?: string;
    parameters?: Record<string, any>;
  };
}

// In Flow Builder UI
const TaskPackageSelector = () => {
  return (
    <div>
      <Label>Select Mission Package</Label>
      <Tabs defaultValue="registry">
        <TabsList>
          <TabsTrigger value="registry">Mission Registry</TabsTrigger>
          <TabsTrigger value="custom">Custom Mission</TabsTrigger>
        </TabsList>

        <TabsContent value="registry">
          <Select>
            <SelectTrigger>
              <SelectValue placeholder="Choose a package" />
            </SelectTrigger>
            <SelectContent>
              <SelectGroup>
                <SelectLabel>🎯 Training Missions</SelectLabel>
                <SelectItem value="pathology-llm-finetuning">
                  Operation: LLM Enhancement
                </SelectItem>
                <SelectItem value="fedavg-resnet50-pathology">
                  Operation: Federated Vision
                </SelectItem>
              </SelectGroup>
              <SelectGroup>
                <SelectLabel>🔍 Discovery Missions</SelectLabel>
                <SelectItem value="biomarker-discovery-spatial">
                  Operation: Biomarker Hunt
                </SelectItem>
                <SelectItem value="drug-response-prediction">
                  Operation: Pharma Intel
                </SelectItem>
              </SelectGroup>
              <SelectGroup>
                <SelectLabel>🛡️ Special Operations</SelectLabel>
                <SelectItem value="privacy-preserving-analysis">
                  Operation: Ghost Protocol
                </SelectItem>
              </SelectGroup>
            </SelectContent>
          </Select>
        </TabsContent>

        <TabsContent value="custom">
          <div className="space-y-4">
            <Input placeholder="Package URL (tar.gz)" />
            <Button variant="outline">
              <Upload className="h-4 w-4 " />
              Upload Package
            </Button>
          </div>
        </TabsContent>
      </Tabs>
    </div>
  );
};

Key Benefits

  1. Standardized Interface: All packages implement the same base class
  2. Flexible Execution: Supports various frameworks and approaches
  3. Privacy Preserving: Code runs locally, only aggregated results shared
  4. Validated & Secure: Packages are validated before execution
  5. Reusable: Packages can be versioned and shared via registry
  6. Extensible: Easy to add new package types for emerging use cases