用Python模拟无人机‘招标’:手把手教你实现合同网协议(CNP)的分布式任务分配

张开发
2026/4/18 9:19:35 15 分钟阅读

分享文章

用Python模拟无人机‘招标’:手把手教你实现合同网协议(CNP)的分布式任务分配
用Python模拟无人机‘招标’手把手教你实现合同网协议(CNP)的分布式任务分配当无人机集群需要协同完成复杂任务时如何高效分配任务成为关键挑战。想象这样一个场景三架无人机需要完成区域测绘、物资投递和实时监控三项任务每架无人机的能力和当前状态各不相同。传统集中式分配可能导致单点故障而完全随机分配又缺乏效率。这正是合同网协议(Contract Net Protocol)大显身手的时刻——它模拟人类招标流程通过智能体间的协商实现最优任务分配。本文将带你用Python从零构建一个精简但完整的CNP模拟系统。不同于单纯的理论讲解我们会聚焦于可运行的代码实现让你在编写Manager和Contractor类的过程中真正理解分布式协商的精髓。最终完成的模拟系统将包含任务发布、投标评估、合同签订等完整流程并留有充分的扩展接口供你实验更复杂的策略。1. 环境搭建与基础类设计开始编码前确保你的Python环境已安装3.8版本。我们不需要额外依赖库标准库就足够构建这个模拟系统。建议使用Jupyter Notebook或VS Code等支持交互式调试的环境方便观察智能体间的交互过程。智能体基类设计是所有CNP实现的起点。这个基类需要包含哪些核心属性至少每个智能体都需要唯一标识和状态跟踪能力class Agent: def __init__(self, name): self.name name # 智能体唯一标识 self.active_tasks [] # 当前承担的任务 self.battery 100 # 模拟电量状态 self.location (0, 0) # 二维坐标位置 def update_status(self, taskNone): 更新智能体状态执行任务时消耗电量 if task: self.active_tasks.append(task) self.battery - 10 # 简化的能耗模型这个基础框架已经包含了后续扩展所需的关键元素。注意我们特意加入了battery和location属性虽然基础流程中可能用不到但它们为后续实现基于电量或距离的评估策略埋下了伏笔。2. 实现管理者(Manager)角色管理者是CNP协议的核心协调者其核心职责可以用三个关键方法概括announce_task()- 向可用承包商广播任务evaluate_bids()- 评估收到的投标方案award_contract()- 与最优承包商签订合同以下是具体实现示例class Manager(Agent): def __init__(self, name): super().__init__(name) self.open_tasks {} # 待分配任务池 self.received_bids [] # 投标记录 def create_task(self, task_type, priority1): 生成新任务并加入待分配池 task_id ftask_{len(self.open_tasks)1} self.open_tasks[task_id] { type: task_type, priority: priority, status: pending } return task_id def announce_task(self, contractors): 向所有承包商广播最高优先级任务 if not self.open_tasks: print(No tasks available) return # 选择优先级最高的待分配任务 current_task max(self.open_tasks.items(), keylambda x: x[1][priority]) print(f[{self.name}] 发布任务: {current_task[0]}) for contractor in contractors: # 模拟异步消息传递 contractor.receive_task_announcement( self, current_task[0], self.open_tasks[current_task[0]] )特别注意announce_task方法中的承包商通知机制。在实际分布式系统中这里应该使用消息队列或RPC调用但我们的模拟采用直接方法调用简化实现。3. 承包商(Contractor)的投标逻辑承包商需要具备评估自身任务适配度的能力。我们实现一个能考虑多种因素的智能投标策略class Contractor(Agent): def __init__(self, name, specialty): super().__init__(name) self.specialty specialty # 专长领域 self.bid_strategy balanced # 可配置的投标策略 def receive_task_announcement(self, manager, task_id, task_details): 处理收到的任务公告 suitability self.assess_task_suitability(task_details) if suitability 0: # 只对合适任务投标 bid_details self.prepare_bid(manager, task_id, suitability) self.submit_bid(manager, bid_details) def assess_task_suitability(self, task_details): 综合评估任务适配度 score 0 # 专业匹配度评估 if task_details[type] in self.specialty: score 40 # 电量充足性评估 if self.battery 30: score 30 else: score - 20 # 当前负载评估 if len(self.active_tasks) 2: score 30 else: score - 10 return score def prepare_bid(self, manager, task_id, suitability): 准备投标方案 base_price 100 bid_price base_price * (1 - suitability/100) # 适配度越高报价越低 return { contractor: self.name, task_id: task_id, price: bid_price, suitability: suitability, estimated_time: 60 # 简化的预估耗时 }这个实现展示了几个关键设计决策多维度任务评估专业匹配、电量状态、当前负载动态定价策略适配度越高报价越低清晰的投标数据结构4. 完整流程模拟与可视化现在我们将各个组件组合起来模拟一个完整的CNP流程。为了更直观地观察我们会添加简单的可视化输出def simulate_cnp_scenario(): # 初始化智能体 survey_manager Manager(Survey_Coordinator) drone1 Contractor(Drone_Alpha, [survey, monitoring]) drone2 Contractor(Drone_Beta, [delivery, survey]) drone3 Contractor(Drone_Gamma, [monitoring]) # 设置无人机初始位置 drone1.location (10, 20) drone2.location (30, 40) drone3.location (50, 10) # 创建多个任务 tasks [ (survey, 3), (delivery, 2), (monitoring, 1) ] print( 任务创建阶段 ) for task_type, priority in tasks: task_id survey_manager.create_task(task_type, priority) print(f创建任务 {task_id}: {task_type} (优先级:{priority})) print(\n 任务分配阶段 ) contractors [drone1, drone2, drone3] survey_manager.announce_task(contractors) # 模拟投标过程 print(\n 投标评估阶段 ) if survey_manager.received_bids: winning_bid min(survey_manager.received_bids, keylambda x: x[price]) print(f中标者: {winning_bid[contractor]} f(报价:{winning_bid[price]})) # 通知中标者执行任务 for drone in contractors: if drone.name winning_bid[contractor]: drone.execute_task(winning_bid[task_id]) break else: print(没有收到有效投标) # 运行模拟 simulate_cnp_scenario()典型输出示例 任务创建阶段 创建任务 task_1: survey (优先级:3) 创建任务 task_2: delivery (优先级:2) 创建任务 task_3: monitoring (优先级:1) 任务分配阶段 [Survey_Coordinator] 发布任务: task_1 Drone_Alpha 投标报价: 72.0 Drone_Beta 投标报价: 76.0 投标评估阶段 中标者: Drone_Alpha (报价:72.0) Drone_Alpha 开始执行任务: task_15. 高级扩展与优化方向基础实现完成后可以考虑以下增强功能多轮投标策略class AdvancedManager(Manager): def conduct_auction(self, contractors, rounds3): 多轮投标拍卖 best_bids [] for _ in range(rounds): self.announce_task(contractors) if self.received_bids: best_bids.append(min(self.received_bids, keylambda x: x[price])) self.received_bids [] # 清空当前轮次投标 return min(best_bids, keylambda x: x[price]) if best_bids else None考虑距离因素的评估def calculate_distance(self, task_location): 计算与任务地点的欧氏距离 return ((self.location[0] - task_location[0])**2 (self.location[1] - task_location[1])**2)**0.5 def assess_task_suitability(self, task_details): 增强版适配度评估 score super().assess_task_suitability(task_details) # 距离惩罚因子 (每10单位距离减5分) distance self.calculate_distance(task_details.get(location, (0,0))) score - (distance // 10) * 5 return max(0, score) # 确保不低于0任务执行监控class MonitoredContractor(Contractor): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.task_history [] def execute_task(self, task_id): start_time time.time() super().execute_task(task_id) duration time.time() - start_time self.task_history.append({ task_id: task_id, start: start_time, duration: duration, success: True # 简化的成功假设 })这些扩展展示了CNP协议在实际应用中的灵活性。通过组合不同的策略你可以模拟出各种复杂的任务分配场景。

更多文章