Skip to Content
DocsTaskpack 架构

Taskpack Architecture

Overview

Taskpacks are the executable units that run on Bitedge nodes. They contain the actual code, dependencies, and configurations needed to process data locally while preserving privacy.

Taskpack Structure

taskpack/ ├── manifest.yaml # Package metadata and requirements ├── requirements.txt # Python dependencies ├── Dockerfile # Container definition (optional) ├── src/ │ ├── __init__.py │ ├── task.py # Main task entry point │ ├── preprocessor.py # Data preprocessing logic │ ├── trainer.py # Training logic │ └── utils/ ├── configs/ │ ├── default.yaml # Default configuration │ └── schema.json # Configuration schema ├── tests/ │ └── test_task.py └── README.md

Manifest Structure

# manifest.yaml apiVersion: v1 kind: TaskPackage metadata: name: fedavg-resnet50-pathology version: 1.0.0 author: BitFlow Team description: Federated learning package for pathology image classification spec: type: training # training | analysis | preprocessing | inference framework: pytorch # pytorch | tensorflow | custom requirements: python: ">=3.8" gpu: required # required | optional | none memory: 16GB storage: 50GB inputs: - name: images type: wsi # wsi | dicom | numpy | csv format: [".svs", ".tiff", ".ndpi"] - name: labels type: annotations format: [".json", ".xml"] outputs: - name: model_updates type: model_weights format: ".pth" - name: metrics type: metrics format: ".json" federated: protocol: fedavg # fedavg | fedprox | scaffold aggregation_compatible: true differential_privacy: optional

Taskpack Types

1. LLM Training Package

# manifest.yaml for LLM training apiVersion: v1 kind: TaskPackage metadata: name: pathology-llm-finetuning version: 1.0.0 description: Fine-tune LLM on pathology reports and WSI descriptions spec: type: training framework: transformers requirements: python: ">=3.9" gpu: required memory: 32GB gpu_memory: 24GB # For LLM inputs: - name: reports type: text format: [".txt", ".json"] - name: wsi_features type: embeddings format: ".npy" - name: qa_pairs type: json format: ".json" outputs: - name: model_adapter # LoRA/QLoRA adapters type: model_weights format: ".bin" - name: tokenizer type: tokenizer format: ".json" llm_config: base_model: "meta-llama/Llama-2-7b-hf" training_method: "lora" # lora | qlora | full lora_config: r: 16 lora_alpha: 32 target_modules: ["q_proj", "v_proj"]

2. Pharma Discovery Package

# manifest.yaml for pharma biomarker discovery apiVersion: v1 kind: TaskPackage metadata: name: biomarker-discovery-spatial version: 2.0.0 description: Spatial biomarker discovery for drug response prediction spec: type: analysis framework: custom requirements: python: ">=3.8" gpu: optional memory: 32GB inputs: - name: wsi_images type: wsi format: [".svs", ".tiff"] - name: clinical_data type: structured format: [".csv", ".parquet"] - name: treatment_response type: structured format: ".json" outputs: - name: biomarker_candidates type: analysis_results format: ".json" - name: spatial_features type: features format: ".h5" - name: statistical_report type: report format: [".pdf", ".html"] analysis_config: methods: - spatial_statistics - cell_neighborhood_analysis - molecular_correlation privacy_preserving: true statistical_significance: 0.05

Task Entry Point Interface

# src/task.py - Standard interface all packages must implement from abc import ABC, abstractmethod from typing import Dict, Any, Optional class BitflowTask(ABC): """Base class for all BitFlow tasks""" def __init__(self, config: Dict[str, Any]): self.config = config self.node_id = config.get('node_id') self.task_id = config.get('task_id') self.round = config.get('round', 0) @abstractmethod def validate_inputs(self, input_paths: Dict[str, str]) -> bool: """Validate input data meets requirements""" pass @abstractmethod def preprocess(self, input_paths: Dict[str, str]) -> Dict[str, Any]: """Preprocess data before main execution""" pass @abstractmethod def execute(self, preprocessed_data: Dict[str, Any]) -> Dict[str, Any]: """Main task execution""" pass @abstractmethod def postprocess(self, results: Dict[str, Any]) -> Dict[str, str]: """Postprocess results and prepare outputs""" pass def run(self, input_paths: Dict[str, str], output_dir: str) -> Dict[str, Any]: """Standard execution pipeline""" # Validate if not self.validate_inputs(input_paths): raise ValueError("Input validation failed") # Execute pipeline preprocessed = self.preprocess(input_paths) results = self.execute(preprocessed) outputs = self.postprocess(results) # Save outputs for name, path in outputs.items(): self.save_output(path, output_dir, name) return { 'status': 'success', 'outputs': outputs, 'metrics': results.get('metrics', {}) }

Example Implementations

LLM Training Task

# src/llm_training_task.py import torch from transformers import AutoModelForCausalLM, AutoTokenizer from peft import LoraConfig, get_peft_model, TaskType class PathologyLLMTask(BitflowTask): def __init__(self, config): super().__init__(config) self.model = None self.tokenizer = None def preprocess(self, input_paths): # Load pathology reports and WSI descriptions reports = self.load_reports(input_paths['reports']) wsi_features = self.load_features(input_paths['wsi_features']) # Create training dataset dataset = self.create_dataset(reports, wsi_features) return {'dataset': dataset} def execute(self, preprocessed_data): # Initialize model with LoRA base_model = AutoModelForCausalLM.from_pretrained( self.config['base_model'], load_in_8bit=True, device_map="auto" ) lora_config = LoraConfig( task_type=TaskType.CAUSAL_LM, r=self.config['lora_r'], lora_alpha=self.config['lora_alpha'], target_modules=self.config['target_modules'] ) model = get_peft_model(base_model, lora_config) # Train trainer = self.create_trainer(model, preprocessed_data['dataset']) trainer.train() return { 'model': model, 'metrics': trainer.state.log_history }

Pharma Discovery Task

# src/biomarker_discovery_task.py import numpy as np from scipy import stats from skimage import measure import pandas as pd class BiomarkerDiscoveryTask(BitflowTask): def preprocess(self, input_paths): # Load WSI and extract patches wsi_data = self.load_wsi(input_paths['wsi_images']) clinical = pd.read_csv(input_paths['clinical_data']) # Extract spatial features features = self.extract_spatial_features(wsi_data) return { 'features': features, 'clinical': clinical, 'wsi_metadata': wsi_data['metadata'] } def execute(self, preprocessed_data): features = preprocessed_data['features'] clinical = preprocessed_data['clinical'] # Spatial analysis spatial_patterns = self.analyze_spatial_patterns(features) # Correlation with treatment response biomarkers = self.correlate_with_response( spatial_patterns, clinical['treatment_response'] ) # Statistical validation validated = self.statistical_validation(biomarkers, significance=0.05) return { 'biomarker_candidates': validated, 'spatial_features': spatial_patterns, 'statistics': self.generate_statistics(validated) } def analyze_spatial_patterns(self, features): """Analyze spatial relationships between cells""" patterns = [] for slide_features in features: # Cell neighborhood analysis neighborhoods = self.compute_neighborhoods(slide_features) # Spatial statistics (Ripley's K, Moran's I) spatial_stats = self.compute_spatial_stats(neighborhoods) patterns.append({ 'neighborhoods': neighborhoods, 'spatial_stats': spatial_stats }) return patterns

Package Distribution

1. Package Registry

# bitflow-registry.yaml registry: url: https://registry.bitflow.ai packages: - name: fedavg-resnet50-pathology versions: - version: 1.0.0 url: s3://bitflow-packages/fedavg-resnet50-pathology-1.0.0.tar.gz checksum: sha256:abcd1234... - name: pathology-llm-finetuning versions: - version: 1.0.0 url: s3://bitflow-packages/pathology-llm-finetuning-1.0.0.tar.gz requirements: base_model_download: true - name: biomarker-discovery-spatial versions: - version: 2.0.0 url: s3://bitflow-packages/biomarker-discovery-spatial-2.0.0.tar.gz

2. Package Validation (Mission Authentication)

# Mission authentication on Bitedge before accepting class MissionAuthenticator: def authenticate_mission(self, mission_package: str) -> bool: """Verify this mission package is legitimate and safe""" print("📨 Incoming mission package...") # Verify mission source (IMF headquarters) manifest = self.decode_mission_briefing(mission_package) if not self.verify_mission_authority(manifest): print("⚠️ Mission source unverified. Aborting.") return False # Check agent capabilities match mission requirements if not self.verify_agent_equipment(manifest['spec']['requirements']): print("⚠️ Agent lacks required equipment. Mission declined.") return False # Security clearance if not self.security_clearance(mission_package): print("🚫 Security breach detected. Mission compromised.") return False # Mission simulation (dry run) if not self.simulate_mission(mission_package): print("❌ Mission simulation failed. Too dangerous.") return False print("✅ Mission authenticated. Good luck, agent.") print("⏰ This package will self-destruct after execution...") return True def secure_cleanup(self, mission_id: str): """Mission complete. Eliminate all traces.""" # Clear sensitive data from memory # Remove temporary files # Wipe execution logs print(f"💥 Mission {mission_id} data destroyed.")

UI Integration (Mission Control Interface)

Flow Configuration with Mission Package Selection

interface FlowConfig { // ... existing config taskPackage: { name: string; version: string; source: "registry" | "custom"; customUrl?: string; parameters?: Record<string, any>; }; } // In Flow Builder UI const TaskPackageSelector = () => { return ( <div> <Label>Select Mission Package</Label> <Tabs defaultValue="registry"> <TabsList> <TabsTrigger value="registry">Mission Registry</TabsTrigger> <TabsTrigger value="custom">Custom Mission</TabsTrigger> </TabsList> <TabsContent value="registry"> <Select> <SelectTrigger> <SelectValue placeholder="Choose a package" /> </SelectTrigger> <SelectContent> <SelectGroup> <SelectLabel>🎯 Training Missions</SelectLabel> <SelectItem value="pathology-llm-finetuning"> Operation: LLM Enhancement </SelectItem> <SelectItem value="fedavg-resnet50-pathology"> Operation: Federated Vision </SelectItem> </SelectGroup> <SelectGroup> <SelectLabel>🔍 Discovery Missions</SelectLabel> <SelectItem value="biomarker-discovery-spatial"> Operation: Biomarker Hunt </SelectItem> <SelectItem value="drug-response-prediction"> Operation: Pharma Intel </SelectItem> </SelectGroup> <SelectGroup> <SelectLabel>🛡️ Special Operations</SelectLabel> <SelectItem value="privacy-preserving-analysis"> Operation: Ghost Protocol </SelectItem> </SelectGroup> </SelectContent> </Select> </TabsContent> <TabsContent value="custom"> <div className="space-y-4"> <Input placeholder="Package URL (tar.gz)" /> <Button variant="outline"> <Upload className="h-4 w-4 " /> Upload Package </Button> </div> </TabsContent> </Tabs> </div> ); };

Key Benefits

  1. Standardized Interface: All packages implement the same base class
  2. Flexible Execution: Supports various frameworks and approaches
  3. Privacy Preserving: Code runs locally, only aggregated results shared
  4. Validated & Secure: Packages are validated before execution
  5. Reusable: Packages can be versioned and shared via registry
  6. Extensible: Easy to add new package types for emerging use cases
最后更新