Taskpack Architecture
Overview
Taskpacks are the executable units that run on Bitedge nodes. They contain the actual code, dependencies, and configurations needed to process data locally while preserving privacy.
Taskpack Structure
taskpack/
├── manifest.yaml # Package metadata and requirements
├── requirements.txt # Python dependencies
├── Dockerfile # Container definition (optional)
├── src/
│ ├── __init__.py
│ ├── task.py # Main task entry point
│ ├── preprocessor.py # Data preprocessing logic
│ ├── trainer.py # Training logic
│ └── utils/
├── configs/
│ ├── default.yaml # Default configuration
│ └── schema.json # Configuration schema
├── tests/
│ └── test_task.py
└── README.mdManifest Structure
# manifest.yaml
apiVersion: v1
kind: TaskPackage
metadata:
name: fedavg-resnet50-pathology
version: 1.0.0
author: BitFlow Team
description: Federated learning package for pathology image classification
spec:
type: training # training | analysis | preprocessing | inference
framework: pytorch # pytorch | tensorflow | custom
requirements:
python: ">=3.8"
gpu: required # required | optional | none
memory: 16GB
storage: 50GB
inputs:
- name: images
type: wsi # wsi | dicom | numpy | csv
format: [".svs", ".tiff", ".ndpi"]
- name: labels
type: annotations
format: [".json", ".xml"]
outputs:
- name: model_updates
type: model_weights
format: ".pth"
- name: metrics
type: metrics
format: ".json"
federated:
protocol: fedavg # fedavg | fedprox | scaffold
aggregation_compatible: true
differential_privacy: optionalTaskpack Types
1. LLM Training Package
# manifest.yaml for LLM training
apiVersion: v1
kind: TaskPackage
metadata:
name: pathology-llm-finetuning
version: 1.0.0
description: Fine-tune LLM on pathology reports and WSI descriptions
spec:
type: training
framework: transformers
requirements:
python: ">=3.9"
gpu: required
memory: 32GB
gpu_memory: 24GB # For LLM
inputs:
- name: reports
type: text
format: [".txt", ".json"]
- name: wsi_features
type: embeddings
format: ".npy"
- name: qa_pairs
type: json
format: ".json"
outputs:
- name: model_adapter # LoRA/QLoRA adapters
type: model_weights
format: ".bin"
- name: tokenizer
type: tokenizer
format: ".json"
llm_config:
base_model: "meta-llama/Llama-2-7b-hf"
training_method: "lora" # lora | qlora | full
lora_config:
r: 16
lora_alpha: 32
target_modules: ["q_proj", "v_proj"]2. Pharma Discovery Package
# manifest.yaml for pharma biomarker discovery
apiVersion: v1
kind: TaskPackage
metadata:
name: biomarker-discovery-spatial
version: 2.0.0
description: Spatial biomarker discovery for drug response prediction
spec:
type: analysis
framework: custom
requirements:
python: ">=3.8"
gpu: optional
memory: 32GB
inputs:
- name: wsi_images
type: wsi
format: [".svs", ".tiff"]
- name: clinical_data
type: structured
format: [".csv", ".parquet"]
- name: treatment_response
type: structured
format: ".json"
outputs:
- name: biomarker_candidates
type: analysis_results
format: ".json"
- name: spatial_features
type: features
format: ".h5"
- name: statistical_report
type: report
format: [".pdf", ".html"]
analysis_config:
methods:
- spatial_statistics
- cell_neighborhood_analysis
- molecular_correlation
privacy_preserving: true
statistical_significance: 0.05Task Entry Point Interface
# src/task.py - Standard interface all packages must implement
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
class BitflowTask(ABC):
"""Base class for all BitFlow tasks"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.node_id = config.get('node_id')
self.task_id = config.get('task_id')
self.round = config.get('round', 0)
@abstractmethod
def validate_inputs(self, input_paths: Dict[str, str]) -> bool:
"""Validate input data meets requirements"""
pass
@abstractmethod
def preprocess(self, input_paths: Dict[str, str]) -> Dict[str, Any]:
"""Preprocess data before main execution"""
pass
@abstractmethod
def execute(self, preprocessed_data: Dict[str, Any]) -> Dict[str, Any]:
"""Main task execution"""
pass
@abstractmethod
def postprocess(self, results: Dict[str, Any]) -> Dict[str, str]:
"""Postprocess results and prepare outputs"""
pass
def run(self, input_paths: Dict[str, str], output_dir: str) -> Dict[str, Any]:
"""Standard execution pipeline"""
# Validate
if not self.validate_inputs(input_paths):
raise ValueError("Input validation failed")
# Execute pipeline
preprocessed = self.preprocess(input_paths)
results = self.execute(preprocessed)
outputs = self.postprocess(results)
# Save outputs
for name, path in outputs.items():
self.save_output(path, output_dir, name)
return {
'status': 'success',
'outputs': outputs,
'metrics': results.get('metrics', {})
}Example Implementations
LLM Training Task
# src/llm_training_task.py
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import LoraConfig, get_peft_model, TaskType
class PathologyLLMTask(BitflowTask):
def __init__(self, config):
super().__init__(config)
self.model = None
self.tokenizer = None
def preprocess(self, input_paths):
# Load pathology reports and WSI descriptions
reports = self.load_reports(input_paths['reports'])
wsi_features = self.load_features(input_paths['wsi_features'])
# Create training dataset
dataset = self.create_dataset(reports, wsi_features)
return {'dataset': dataset}
def execute(self, preprocessed_data):
# Initialize model with LoRA
base_model = AutoModelForCausalLM.from_pretrained(
self.config['base_model'],
load_in_8bit=True,
device_map="auto"
)
lora_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
r=self.config['lora_r'],
lora_alpha=self.config['lora_alpha'],
target_modules=self.config['target_modules']
)
model = get_peft_model(base_model, lora_config)
# Train
trainer = self.create_trainer(model, preprocessed_data['dataset'])
trainer.train()
return {
'model': model,
'metrics': trainer.state.log_history
}Pharma Discovery Task
# src/biomarker_discovery_task.py
import numpy as np
from scipy import stats
from skimage import measure
import pandas as pd
class BiomarkerDiscoveryTask(BitflowTask):
def preprocess(self, input_paths):
# Load WSI and extract patches
wsi_data = self.load_wsi(input_paths['wsi_images'])
clinical = pd.read_csv(input_paths['clinical_data'])
# Extract spatial features
features = self.extract_spatial_features(wsi_data)
return {
'features': features,
'clinical': clinical,
'wsi_metadata': wsi_data['metadata']
}
def execute(self, preprocessed_data):
features = preprocessed_data['features']
clinical = preprocessed_data['clinical']
# Spatial analysis
spatial_patterns = self.analyze_spatial_patterns(features)
# Correlation with treatment response
biomarkers = self.correlate_with_response(
spatial_patterns,
clinical['treatment_response']
)
# Statistical validation
validated = self.statistical_validation(biomarkers,
significance=0.05)
return {
'biomarker_candidates': validated,
'spatial_features': spatial_patterns,
'statistics': self.generate_statistics(validated)
}
def analyze_spatial_patterns(self, features):
"""Analyze spatial relationships between cells"""
patterns = []
for slide_features in features:
# Cell neighborhood analysis
neighborhoods = self.compute_neighborhoods(slide_features)
# Spatial statistics (Ripley's K, Moran's I)
spatial_stats = self.compute_spatial_stats(neighborhoods)
patterns.append({
'neighborhoods': neighborhoods,
'spatial_stats': spatial_stats
})
return patternsPackage Distribution
1. Package Registry
# bitflow-registry.yaml
registry:
url: https://registry.bitflow.ai
packages:
- name: fedavg-resnet50-pathology
versions:
- version: 1.0.0
url: s3://bitflow-packages/fedavg-resnet50-pathology-1.0.0.tar.gz
checksum: sha256:abcd1234...
- name: pathology-llm-finetuning
versions:
- version: 1.0.0
url: s3://bitflow-packages/pathology-llm-finetuning-1.0.0.tar.gz
requirements:
base_model_download: true
- name: biomarker-discovery-spatial
versions:
- version: 2.0.0
url: s3://bitflow-packages/biomarker-discovery-spatial-2.0.0.tar.gz2. Package Validation (Mission Authentication)
# Mission authentication on Bitedge before accepting
class MissionAuthenticator:
def authenticate_mission(self, mission_package: str) -> bool:
"""Verify this mission package is legitimate and safe"""
print("📨 Incoming mission package...")
# Verify mission source (IMF headquarters)
manifest = self.decode_mission_briefing(mission_package)
if not self.verify_mission_authority(manifest):
print("⚠️ Mission source unverified. Aborting.")
return False
# Check agent capabilities match mission requirements
if not self.verify_agent_equipment(manifest['spec']['requirements']):
print("⚠️ Agent lacks required equipment. Mission declined.")
return False
# Security clearance
if not self.security_clearance(mission_package):
print("🚫 Security breach detected. Mission compromised.")
return False
# Mission simulation (dry run)
if not self.simulate_mission(mission_package):
print("❌ Mission simulation failed. Too dangerous.")
return False
print("✅ Mission authenticated. Good luck, agent.")
print("⏰ This package will self-destruct after execution...")
return True
def secure_cleanup(self, mission_id: str):
"""Mission complete. Eliminate all traces."""
# Clear sensitive data from memory
# Remove temporary files
# Wipe execution logs
print(f"💥 Mission {mission_id} data destroyed.")UI Integration (Mission Control Interface)
Flow Configuration with Mission Package Selection
interface FlowConfig {
// ... existing config
taskPackage: {
name: string;
version: string;
source: "registry" | "custom";
customUrl?: string;
parameters?: Record<string, any>;
};
}
// In Flow Builder UI
const TaskPackageSelector = () => {
return (
<div>
<Label>Select Mission Package</Label>
<Tabs defaultValue="registry">
<TabsList>
<TabsTrigger value="registry">Mission Registry</TabsTrigger>
<TabsTrigger value="custom">Custom Mission</TabsTrigger>
</TabsList>
<TabsContent value="registry">
<Select>
<SelectTrigger>
<SelectValue placeholder="Choose a package" />
</SelectTrigger>
<SelectContent>
<SelectGroup>
<SelectLabel>🎯 Training Missions</SelectLabel>
<SelectItem value="pathology-llm-finetuning">
Operation: LLM Enhancement
</SelectItem>
<SelectItem value="fedavg-resnet50-pathology">
Operation: Federated Vision
</SelectItem>
</SelectGroup>
<SelectGroup>
<SelectLabel>🔍 Discovery Missions</SelectLabel>
<SelectItem value="biomarker-discovery-spatial">
Operation: Biomarker Hunt
</SelectItem>
<SelectItem value="drug-response-prediction">
Operation: Pharma Intel
</SelectItem>
</SelectGroup>
<SelectGroup>
<SelectLabel>🛡️ Special Operations</SelectLabel>
<SelectItem value="privacy-preserving-analysis">
Operation: Ghost Protocol
</SelectItem>
</SelectGroup>
</SelectContent>
</Select>
</TabsContent>
<TabsContent value="custom">
<div className="space-y-4">
<Input placeholder="Package URL (tar.gz)" />
<Button variant="outline">
<Upload className="h-4 w-4 " />
Upload Package
</Button>
</div>
</TabsContent>
</Tabs>
</div>
);
};Key Benefits
- Standardized Interface: All packages implement the same base class
- Flexible Execution: Supports various frameworks and approaches
- Privacy Preserving: Code runs locally, only aggregated results shared
- Validated & Secure: Packages are validated before execution
- Reusable: Packages can be versioned and shared via registry
- Extensible: Easy to add new package types for emerging use cases
最后更新