From f58921ef823cfb45a3a10064aac68d1d37c464a4 Mon Sep 17 00:00:00 2001 From: dijkstra402 Date: Thu, 30 Oct 2025 16:16:53 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=89=A3=E7=94=B5?= =?UTF-8?q?=E5=B7=A5=E4=BD=9C=E7=AB=99=20setup()=20=E6=96=B9=E6=B3=95?= =?UTF-8?q?=E5=B9=B6=E4=BF=AE=E5=A4=8D=E6=98=BE=E7=A4=BA=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 主要改动: - ✅ 在 CoincellDeck 实现 setup() 方法(模仿 decks.py 三步配置模式) - ✅ 统一 Deck 默认尺寸为 1000x1000x900mm - ✅ 优化料盘布局:横向排列,留50mm边距 - ✅ 简化工作站 deck 创建逻辑(从30行减至1行) - ✅ 新增 create_coin_cell_deck() 便捷函数 - ✅ 修复 ClipMagazine 参数错误 - ✅ 删除约200行冗余代码 - ✅ 修复底座不显示问题 技术细节: - MaterialPlate 位置: liaopan1(50,50), liaopan2(250,50), 电池料盘(450,50) - 自动为 liaopan1 添加16个初始极片 - 支持3种 deck 创建方式 - 智能判断是否需要 setup --- .../coin_cell_assembly/YB_YH_materials.py | 976 +++++++++++++ .../coin_cell_assembly/coin_cell_assembly.py | 1224 +++++++++++++++++ 2 files changed, 2200 insertions(+) create mode 100644 unilabos/devices/workstation/coin_cell_assembly/YB_YH_materials.py create mode 100644 unilabos/devices/workstation/coin_cell_assembly/coin_cell_assembly.py diff --git a/unilabos/devices/workstation/coin_cell_assembly/YB_YH_materials.py b/unilabos/devices/workstation/coin_cell_assembly/YB_YH_materials.py new file mode 100644 index 00000000..6ea89398 --- /dev/null +++ b/unilabos/devices/workstation/coin_cell_assembly/YB_YH_materials.py @@ -0,0 +1,976 @@ +""" +纽扣电池组装工作站物料类定义 +Button Battery Assembly Station Resource Classes +""" + +from __future__ import annotations + +from collections import OrderedDict +from typing import Any, Dict, List, Optional, TypedDict, Union, cast + +from pylabrobot.resources.coordinate import Coordinate +from pylabrobot.resources.container import Container +from pylabrobot.resources.deck import Deck +from pylabrobot.resources.itemized_resource import ItemizedResource +from pylabrobot.resources.resource import Resource +from pylabrobot.resources.resource_stack import ResourceStack +from pylabrobot.resources.tip_rack import TipRack, TipSpot +from pylabrobot.resources.trash import Trash +from pylabrobot.resources.utils import create_ordered_items_2d + + +class ElectrodeSheetState(TypedDict): + diameter: float # 直径 (mm) + thickness: float # 厚度 (mm) + mass: float # 质量 (g) + material_type: str # 材料类型(正极、负极、隔膜、弹片、垫片、铝箔等) + height: float + electrolyte_name: str + data_electrolyte_code: str + open_circuit_voltage: float + assembly_pressure: float + electrolyte_volume: float + + info: Optional[str] # 附加信息 + +class ElectrodeSheet(Resource): + """极片类 - 包含正负极片、隔膜、弹片、垫片、铝箔等所有片状材料""" + + def __init__( + self, + name: str = "极片", + size_x=10, + size_y=10, + size_z=10, + category: str = "electrode_sheet", + model: Optional[str] = None, + ): + """初始化极片 + + Args: + name: 极片名称 + category: 类别 + model: 型号 + """ + super().__init__( + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + category=category, + model=model, + ) + self._unilabos_state: ElectrodeSheetState = ElectrodeSheetState( + diameter=14, + thickness=0.1, + mass=0.5, + material_type="copper", + info=None + ) + + # TODO: 这个还要不要?给self._unilabos_state赋值的? + def load_state(self, state: Dict[str, Any]) -> None: + """格式不变""" + super().load_state(state) + self._unilabos_state = state + #序列化 + def serialize_state(self) -> Dict[str, Dict[str, Any]]: + """格式不变""" + data = super().serialize_state() + data.update(self._unilabos_state) # Container自身的信息,云端物料将保存这一data,本地也通过这里的data进行读写,当前类用来表示这个物料的长宽高大小的属性,而data(state用来表示物料的内容,细节等) + return data + +# TODO: 这个应该只能放一个极片 +class MaterialHoleState(TypedDict): + diameter: int + depth: int + max_sheets: int + info: Optional[str] # 附加信息 + +class MaterialHole(Resource): + """料板洞位类""" + children: List[ElectrodeSheet] = [] + + def __init__( + self, + name: str, + size_x: float, + size_y: float, + size_z: float, + category: str = "material_hole", + **kwargs + ): + super().__init__( + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + category=category, + ) + self._unilabos_state: MaterialHoleState = MaterialHoleState( + diameter=20, + depth=10, + max_sheets=1, + info=None + ) + + def get_all_sheet_info(self): + info_list = [] + for sheet in self.children: + info_list.append(sheet._unilabos_state["info"]) + return info_list + + #这个函数函数好像没用,一般不会集中赋值质量 + def set_all_sheet_mass(self): + for sheet in self.children: + sheet._unilabos_state["mass"] = 0.5 # 示例:设置质量为0.5g + + def load_state(self, state: Dict[str, Any]) -> None: + """格式不变""" + super().load_state(state) + self._unilabos_state = state + + def serialize_state(self) -> Dict[str, Dict[str, Any]]: + """格式不变""" + data = super().serialize_state() + data.update(self._unilabos_state) # Container自身的信息,云端物料将保存这一data,本地也通过这里的data进行读写,当前类用来表示这个物料的长宽高大小的属性,而data(state用来表示物料的内容,细节等) + return data + #移动极片前先取出对象 + def get_sheet_with_name(self, name: str) -> Optional[ElectrodeSheet]: + for sheet in self.children: + if sheet.name == name: + return sheet + return None + + def has_electrode_sheet(self) -> bool: + """检查洞位是否有极片""" + return len(self.children) > 0 + + def assign_child_resource( + self, + resource: ElectrodeSheet, + location: Optional[Coordinate], + reassign: bool = True, + ): + """放置极片""" + # TODO: 这里要改,diameter找不到,加入._unilabos_state后应该没问题 + #if resource._unilabos_state["diameter"] > self._unilabos_state["diameter"]: + # raise ValueError(f"极片直径 {resource._unilabos_state['diameter']} 超过洞位直径 {self._unilabos_state['diameter']}") + #if len(self.children) >= self._unilabos_state["max_sheets"]: + # raise ValueError(f"洞位已满,无法放置更多极片") + super().assign_child_resource(resource, location, reassign) + + # 根据children的编号取物料对象。 + def get_electrode_sheet_info(self, index: int) -> ElectrodeSheet: + return self.children[index] + + + +class MaterialPlateState(TypedDict): + hole_spacing_x: float + hole_spacing_y: float + hole_diameter: float + info: Optional[str] # 附加信息 + +class MaterialPlate(ItemizedResource[MaterialHole]): + """料板类 - 4x4个洞位,每个洞位放1个极片""" + + children: List[MaterialHole] + + def __init__( + self, + name: str, + size_x: float, + size_y: float, + size_z: float, + ordered_items: Optional[Dict[str, MaterialHole]] = None, + ordering: Optional[OrderedDict[str, str]] = None, + category: str = "material_plate", + model: Optional[str] = None, + fill: bool = False + ): + """初始化料板 + + Args: + name: 料板名称 + size_x: 长度 (mm) + size_y: 宽度 (mm) + size_z: 高度 (mm) + hole_diameter: 洞直径 (mm) + hole_depth: 洞深度 (mm) + hole_spacing_x: X方向洞位间距 (mm) + hole_spacing_y: Y方向洞位间距 (mm) + number: 编号 + category: 类别 + model: 型号 + """ + self._unilabos_state: MaterialPlateState = MaterialPlateState( + hole_spacing_x=24.0, + hole_spacing_y=24.0, + hole_diameter=20.0, + info="", + ) + # 创建4x4的洞位 + # TODO: 这里要改,对应不同形状 + holes = create_ordered_items_2d( + klass=MaterialHole, + num_items_x=4, + num_items_y=4, + dx=(size_x - 4 * self._unilabos_state["hole_spacing_x"]) / 2, # 居中 + dy=(size_y - 4 * self._unilabos_state["hole_spacing_y"]) / 2, # 居中 + dz=size_z, + item_dx=self._unilabos_state["hole_spacing_x"], + item_dy=self._unilabos_state["hole_spacing_y"], + size_x = 16, + size_y = 16, + size_z = 16, + ) + if fill: + super().__init__( + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + ordered_items=holes, + category=category, + model=model, + ) + else: + super().__init__( + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + ordered_items=ordered_items, + ordering=ordering, + category=category, + model=model, + ) + + def update_locations(self): + # TODO:调多次相加 + holes = create_ordered_items_2d( + klass=MaterialHole, + num_items_x=4, + num_items_y=4, + dx=(self._size_x - 3 * self._unilabos_state["hole_spacing_x"]) / 2, # 居中 + dy=(self._size_y - 3 * self._unilabos_state["hole_spacing_y"]) / 2, # 居中 + dz=self._size_z, + item_dx=self._unilabos_state["hole_spacing_x"], + item_dy=self._unilabos_state["hole_spacing_y"], + size_x = 1, + size_y = 1, + size_z = 1, + ) + for item, original_item in zip(holes.items(), self.children): + original_item.location = item[1].location + + +class PlateSlot(ResourceStack): + """板槽位类 - 1个槽上能堆放8个板,移板只能操作最上方的板""" + + def __init__( + self, + name: str, + size_x: float, + size_y: float, + size_z: float, + max_plates: int = 8, + category: str = "plate_slot", + model: Optional[str] = None + ): + """初始化板槽位 + + Args: + name: 槽位名称 + max_plates: 最大板数量 + category: 类别 + """ + super().__init__( + name=name, + direction="z", # Z方向堆叠 + resources=[], + ) + self.max_plates = max_plates + self.category = category + + def can_add_plate(self) -> bool: + """检查是否可以添加板""" + return len(self.children) < self.max_plates + + def add_plate(self, plate: MaterialPlate) -> None: + """添加料板""" + if not self.can_add_plate(): + raise ValueError(f"槽位 {self.name} 已满,无法添加更多板") + self.assign_child_resource(plate) + + def get_top_plate(self) -> MaterialPlate: + """获取最上方的板""" + if len(self.children) == 0: + raise ValueError(f"槽位 {self.name} 为空") + return cast(MaterialPlate, self.get_top_item()) + + def take_top_plate(self) -> MaterialPlate: + """取出最上方的板""" + top_plate = self.get_top_plate() + self.unassign_child_resource(top_plate) + return top_plate + + def can_access_for_picking(self) -> bool: + """检查是否可以进行取料操作(只有最上方的板能进行取料操作)""" + return len(self.children) > 0 + + def serialize(self) -> dict: + return { + **super().serialize(), + "max_plates": self.max_plates, + } + + +class ClipMagazineHole(Container): + """子弹夹洞位类""" + + def __init__( + self, + name: str, + diameter: float, + depth: float, + max_sheets: int = 100, + category: str = "clip_magazine_hole", + ): + """初始化子弹夹洞位 + + Args: + name: 洞位名称 + diameter: 洞直径 (mm) + depth: 洞深度 (mm) + max_sheets: 最大极片数量 + category: 类别 + """ + super().__init__( + name=name, + size_x=diameter, + size_y=diameter, + size_z=depth, + category=category, + ) + self.diameter = diameter + self.depth = depth + self.max_sheets = max_sheets + self._sheets: List[ElectrodeSheet] = [] + + def can_add_sheet(self, sheet: ElectrodeSheet) -> bool: + """检查是否可以添加极片""" + return (len(self._sheets) < self.max_sheets and + sheet.diameter <= self.diameter) + + def add_sheet(self, sheet: ElectrodeSheet) -> None: + """添加极片""" + if not self.can_add_sheet(sheet): + raise ValueError(f"无法向洞位 {self.name} 添加极片") + self._sheets.append(sheet) + + def take_sheet(self) -> ElectrodeSheet: + """取出极片""" + if len(self._sheets) == 0: + raise ValueError(f"洞位 {self.name} 没有极片") + return self._sheets.pop() + + def get_sheet_count(self) -> int: + """获取极片数量""" + return len(self._sheets) + + def serialize_state(self) -> Dict[str, Any]: + return { + "sheet_count": len(self._sheets), + "sheets": [sheet.serialize() for sheet in self._sheets], + } + +# TODO: 这个要改 +class ClipMagazine(Resource): + """子弹夹类 - 有6个洞位,每个洞位放多个极片""" + + def __init__( + self, + name: str, + size_x: float, + size_y: float, + size_z: float, + hole_diameter: float = 20.0, + hole_depth: float = 50.0, + hole_spacing: float = 25.0, + max_sheets_per_hole: int = 100, + category: str = "clip_magazine", + model: Optional[str] = None, + ): + """初始化子弹夹 + + Args: + name: 子弹夹名称 + size_x: 长度 (mm) + size_y: 宽度 (mm) + size_z: 高度 (mm) + hole_diameter: 洞直径 (mm) + hole_depth: 洞深度 (mm) + hole_spacing: 洞位间距 (mm) + max_sheets_per_hole: 每个洞位最大极片数量 + category: 类别 + model: 型号 + """ + # 创建6个洞位,排成2x3布局 + holes = create_ordered_items_2d( + klass=ClipMagazineHole, + num_items_x=3, + num_items_y=2, + dx=(size_x - 2 * hole_spacing) / 2, # 居中 + dy=(size_y - hole_spacing) / 2, # 居中 + dz=size_z - 0, + item_dx=hole_spacing, + item_dy=hole_spacing, + diameter=hole_diameter, + depth=hole_depth, + ) + + super().__init__( + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + ordered_items=holes, + category=category, + model=model, + ) + + self.hole_diameter = hole_diameter + self.hole_depth = hole_depth + self.max_sheets_per_hole = max_sheets_per_hole + + def serialize(self) -> dict: + return { + **super().serialize(), + "hole_diameter": self.hole_diameter, + "hole_depth": self.hole_depth, + "max_sheets_per_hole": self.max_sheets_per_hole, + } +#是一种类型注解,不用self +class BatteryState(TypedDict): + """电池状态字典""" + diameter: float + height: float + assembly_pressure: float + electrolyte_volume: float + electrolyte_name: str + +class Battery(Resource): + """电池类 - 可容纳极片""" + children: List[ElectrodeSheet] = [] + + def __init__( + self, + name: str, + size_x=1, + size_y=1, + size_z=1, + category: str = "battery", + ): + """初始化电池 + + Args: + name: 电池名称 + diameter: 直径 (mm) + height: 高度 (mm) + max_volume: 最大容量 (μL) + barcode: 二维码编号 + category: 类别 + model: 型号 + """ + super().__init__( + name=name, + size_x=1, + size_y=1, + size_z=1, + category=category, + ) + self._unilabos_state: BatteryState = BatteryState( + diameter = 1.0, + height = 1.0, + assembly_pressure = 1.0, + electrolyte_volume = 1.0, + electrolyte_name = "DP001" + ) + + def add_electrolyte_with_bottle(self, bottle: Bottle) -> bool: + to_add_name = bottle._unilabos_state["electrolyte_name"] + if bottle.aspirate_electrolyte(10): + if self.add_electrolyte(to_add_name, 10): + pass + else: + bottle._unilabos_state["electrolyte_volume"] += 10 + + def set_electrolyte(self, name: str, volume: float) -> None: + """设置电解液信息""" + self._unilabos_state["electrolyte_name"] = name + self._unilabos_state["electrolyte_volume"] = volume + #这个应该没用,不会有加了后再加的事情 + def add_electrolyte(self, name: str, volume: float) -> bool: + """添加电解液信息""" + if name != self._unilabos_state["electrolyte_name"]: + return False + self._unilabos_state["electrolyte_volume"] += volume + + def load_state(self, state: Dict[str, Any]) -> None: + """格式不变""" + super().load_state(state) + self._unilabos_state = state + + def serialize_state(self) -> Dict[str, Dict[str, Any]]: + """格式不变""" + data = super().serialize_state() + data.update(self._unilabos_state) # Container自身的信息,云端物料将保存这一data,本地也通过这里的data进行读写,当前类用来表示这个物料的长宽高大小的属性,而data(state用来表示物料的内容,细节等) + return data + +# 电解液作为属性放进去 + +class BatteryPressSlotState(TypedDict): + """电池状态字典""" + diameter: float =20.0 + depth: float = 4.0 + +class BatteryPressSlot(Resource): + """电池压制槽类 - 设备,可容纳一个电池""" + children: List[Battery] = [] + + def __init__( + self, + name: str = "BatteryPressSlot", + category: str = "battery_press_slot", + ): + """初始化电池压制槽 + + Args: + name: 压制槽名称 + diameter: 直径 (mm) + depth: 深度 (mm) + category: 类别 + model: 型号 + """ + super().__init__( + name=name, + size_x=10, + size_y=12, + size_z=13, + category=category, + ) + self._unilabos_state: BatteryPressSlotState = BatteryPressSlotState() + + def has_battery(self) -> bool: + """检查是否有电池""" + return len(self.children) > 0 + + def load_state(self, state: Dict[str, Any]) -> None: + """格式不变""" + super().load_state(state) + self._unilabos_state = state + + def serialize_state(self) -> Dict[str, Dict[str, Any]]: + """格式不变""" + data = super().serialize_state() + data.update(self._unilabos_state) # Container自身的信息,云端物料将保存这一data,本地也通过这里的data进行读写,当前类用来表示这个物料的长宽高大小的属性,而data(state用来表示物料的内容,细节等) + return data + + def assign_child_resource( + self, + resource: Battery, + location: Optional[Coordinate], + reassign: bool = True, + ): + """放置极片""" + # TODO: 让高京看下槽位只有一个电池时是否这么写。 + if self.has_battery(): + raise ValueError(f"槽位已含有一个电池,无法再放置其他电池") + super().assign_child_resource(resource, location, reassign) + + # 根据children的编号取物料对象。 + def get_battery_info(self, index: int) -> Battery: + return self.children[0] + +# TODO:这个移液枪架子看一下从哪继承 +class TipBox64State(TypedDict): + """电池状态字典""" + tip_diameter: float = 5.0 + tip_length: float = 50.0 + with_tips: bool = True + +class TipBox64(TipRack): + """64孔枪头盒类""" + + children: List[TipSpot] = [] + def __init__( + self, + name: str, + size_x: float = 127.8, + size_y: float = 85.5, + size_z: float = 60.0, + category: str = "tip_box_64", + model: Optional[str] = None, + ): + """初始化64孔枪头盒 + + Args: + name: 枪头盒名称 + size_x: 长度 (mm) + size_y: 宽度 (mm) + size_z: 高度 (mm) + tip_diameter: 枪头直径 (mm) + tip_length: 枪头长度 (mm) + category: 类别 + model: 型号 + with_tips: 是否带枪头 + """ + from pylabrobot.resources.tip import Tip + + # 创建8x8=64个枪头位 + def make_tip(): + return Tip( + has_filter=False, + total_tip_length=20.0, + maximal_volume=1000, # 1mL + fitting_depth=8.0, + ) + + tip_spots = create_ordered_items_2d( + klass=TipSpot, + num_items_x=8, + num_items_y=8, + dx=8.0, + dy=8.0, + dz=0.0, + item_dx=9.0, + item_dy=9.0, + size_x=10, + size_y=10, + size_z=0.0, + make_tip=make_tip, + ) + self._unilabos_state: WasteTipBoxstate = WasteTipBoxstate() + super().__init__( + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + ordered_items=tip_spots, + category=category, + model=model, + with_tips=True, + ) + + + +class WasteTipBoxstate(TypedDict): + """"废枪头盒状态字典""" + max_tips: int = 100 + tip_count: int = 0 + +#枪头不是一次性的(同一溶液则反复使用),根据寄存器判断 +class WasteTipBox(Trash): + """废枪头盒类 - 100个枪头容量""" + + def __init__( + self, + name: str, + size_x: float = 127.8, + size_y: float = 85.5, + size_z: float = 60.0, + category: str = "waste_tip_box", + model: Optional[str] = None, + ): + """初始化废枪头盒 + + Args: + name: 废枪头盒名称 + size_x: 长度 (mm) + size_y: 宽度 (mm) + size_z: 高度 (mm) + max_tips: 最大枪头容量 + category: 类别 + model: 型号 + """ + super().__init__( + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + category=category, + model=model, + ) + self._unilabos_state: WasteTipBoxstate = WasteTipBoxstate() + + def add_tip(self) -> None: + """添加废枪头""" + if self._unilabos_state["tip_count"] >= self._unilabos_state["max_tips"]: + raise ValueError(f"废枪头盒 {self.name} 已满") + self._unilabos_state["tip_count"] += 1 + + def get_tip_count(self) -> int: + """获取枪头数量""" + return self._unilabos_state["tip_count"] + + def empty(self) -> None: + """清空废枪头盒""" + self._unilabos_state["tip_count"] = 0 + + + def load_state(self, state: Dict[str, Any]) -> None: + """格式不变""" + super().load_state(state) + self._unilabos_state = state + + def serialize_state(self) -> Dict[str, Dict[str, Any]]: + """格式不变""" + data = super().serialize_state() + data.update(self._unilabos_state) # Container自身的信息,云端物料将保存这一data,本地也通过这里的data进行读写,当前类用来表示这个物料的长宽高大小的属性,而data(state用来表示物料的内容,细节等) + return data + + +class BottleRackState(TypedDict): + """ bottle_diameter: 瓶子直径 (mm) + bottle_height: 瓶子高度 (mm) + position_spacing: 位置间距 (mm)""" + bottle_diameter: float + bottle_height: float + name_to_index: dict + + + +class BottleRack(Resource): + """瓶架类 - 12个待配位置+12个已配位置""" + children: List[Bottle] = [] + + def __init__( + self, + name: str, + size_x: float, + size_y: float, + size_z: float, + category: str = "bottle_rack", + model: Optional[str] = None, + ): + """初始化瓶架 + + Args: + name: 瓶架名称 + size_x: 长度 (mm) + size_y: 宽度 (mm) + size_z: 高度 (mm) + category: 类别 + model: 型号 + """ + super().__init__( + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + category=category, + model=model, + ) + # TODO: 添加瓶位坐标映射 + self.index_to_pos = { + 0: Coordinate.zero(), + 1: Coordinate(x=1, y=2, z=3) # 添加 + } + self.name_to_index = {} + self.name_to_pos = {} + + def load_state(self, state: Dict[str, Any]) -> None: + """格式不变""" + super().load_state(state) + self._unilabos_state = state + + def serialize_state(self) -> Dict[str, Dict[str, Any]]: + """格式不变""" + data = super().serialize_state() + data.update(self._unilabos_state) # Container自身的信息,云端物料将保存这一data,本地也通过这里的data进行读写,当前类用来表示这个物料的长宽高大小的属性,而data(state用来表示物料的内容,细节等) + return data + + # TODO: 这里有些问题要重新写一下 + def assign_child_resource(self, resource: Bottle, location=Coordinate.zero(), reassign = True): + assert len(self.children) <= 12, "瓶架已满,无法添加更多瓶子" + index = len(self.children) + location = Coordinate(x=20 + (index % 4) * 15, y=20 + (index // 4) * 15, z=0) + self.name_to_pos[resource.name] = location + self.name_to_index[resource.name] = index + return super().assign_child_resource(resource, location, reassign) + + def assign_child_resource_by_index(self, resource: Bottle, index: int): + assert 0 <= index < 12, "无效的瓶子索引" + self.name_to_index[resource.name] = index + location = self.index_to_pos[index] + return super().assign_child_resource(resource, location) + + def unassign_child_resource(self, resource: Bottle): + super().unassign_child_resource(resource) + self.index_to_pos.pop(self.name_to_index.pop(resource.name, None), None) + + # def serialize(self): + # self.children.sort(key=lambda x: self.name_to_index.get(x.name, 0)) + # return super().serialize() + + +class BottleState(TypedDict): + diameter: float + height: float + electrolyte_name: str + electrolyte_volume: float + max_volume: float + +class Bottle(Resource): + """瓶子类 - 容纳电解液""" + + def __init__( + self, + name: str, + category: str = "bottle", + ): + """初始化瓶子 + + Args: + name: 瓶子名称 + diameter: 直径 (mm) + height: 高度 (mm) + max_volume: 最大体积 (μL) + barcode: 二维码 + category: 类别 + model: 型号 + """ + super().__init__( + name=name, + size_x=1, + size_y=1, + size_z=1, + category=category, + ) + self._unilabos_state: BottleState = BottleState() + + def aspirate_electrolyte(self, volume: float) -> bool: + current_volume = self._unilabos_state["electrolyte_volume"] + assert current_volume > volume, f"Cannot aspirate {volume}μL, only {current_volume}μL available." + self._unilabos_state["electrolyte_volume"] -= volume + return True + + def load_state(self, state: Dict[str, Any]) -> None: + """格式不变""" + super().load_state(state) + self._unilabos_state = state + + def serialize_state(self) -> Dict[str, Dict[str, Any]]: + """格式不变""" + data = super().serialize_state() + data.update(self._unilabos_state) # Container自身的信息,云端物料将保存这一data,本地也通过这里的data进行读写,当前类用来表示这个物料的长宽高大小的属性,而data(state用来表示物料的内容,细节等) + return data + +class CoincellDeck(Deck): + """纽扣电池组装工作站台面类""" + + def __init__( + self, + name: str = "coin_cell_deck", + size_x: float = 1000.0, # 1m + size_y: float = 1000.0, # 1m + size_z: float = 900.0, # 0.9m + origin: Coordinate = Coordinate(0, 0, 0), + category: str = "coin_cell_deck", + setup: bool = False, # 是否自动执行 setup + ): + """初始化纽扣电池组装工作站台面 + + Args: + name: 台面名称 + size_x: 长度 (mm) - 1m + size_y: 宽度 (mm) - 1m + size_z: 高度 (mm) - 0.9m + origin: 原点坐标 + category: 类别 + setup: 是否自动执行 setup 配置标准布局 + """ + super().__init__( + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + origin=origin, + category=category, + ) + if setup: + self.setup() + + def setup(self) -> None: + """设置工作站的标准布局 - 包含3个料盘""" + # 步骤 1: 创建所有料盘 + self.plates = { + "liaopan1": MaterialPlate( + name="liaopan1", + size_x=120.8, + size_y=120.5, + size_z=10.0, + fill=True + ), + "liaopan2": MaterialPlate( + name="liaopan2", + size_x=120.8, + size_y=120.5, + size_z=10.0, + fill=True + ), + "电池料盘": MaterialPlate( + name="电池料盘", + size_x=120.8, + size_y=160.5, + size_z=10.0, + fill=True + ), + } + + # 步骤 2: 定义料盘在 deck 上的位置 + # Deck 尺寸: 1000×1000mm,料盘尺寸: 120.8×120.5mm 或 120.8×160.5mm + self.plate_locations = { + "liaopan1": Coordinate(x=50, y=50, z=0), # 左上角,留 50mm 边距 + "liaopan2": Coordinate(x=250, y=50, z=0), # 中间,liaopan1 右侧 + "电池料盘": Coordinate(x=450, y=50, z=0), # 右侧 + } + + # 步骤 3: 将料盘分配到 deck 上 + for plate_name, plate in self.plates.items(): + self.assign_child_resource( + plate, + location=self.plate_locations[plate_name] + ) + + # 步骤 4: 为 liaopan1 添加初始极片 + for i in range(16): + jipian = ElectrodeSheet( + name=f"jipian1_{i}", + size_x=12, + size_y=12, + size_z=0.1 + ) + self.plates["liaopan1"].children[i].assign_child_resource( + jipian, + location=None + ) + + +def create_coin_cell_deck(name: str = "coin_cell_deck", size_x: float = 1000.0, size_y: float = 1000.0, size_z: float = 900.0) -> CoincellDeck: + """创建并配置标准的纽扣电池组装工作站台面 + + Args: + name: 台面名称 + size_x: 长度 (mm) + size_y: 宽度 (mm) + size_z: 高度 (mm) + + Returns: + 已配置好的 CoincellDeck 对象 + """ + deck = CoincellDeck(name=name, size_x=size_x, size_y=size_y, size_z=size_z) + deck.setup() + return deck \ No newline at end of file diff --git a/unilabos/devices/workstation/coin_cell_assembly/coin_cell_assembly.py b/unilabos/devices/workstation/coin_cell_assembly/coin_cell_assembly.py new file mode 100644 index 00000000..cf06f704 --- /dev/null +++ b/unilabos/devices/workstation/coin_cell_assembly/coin_cell_assembly.py @@ -0,0 +1,1224 @@ + +import csv +import inspect +import json +import os +import threading +import time +import types +from datetime import datetime +from typing import Any, Dict, Optional +from functools import wraps +from pylabrobot.resources import Deck, Resource as PLRResource +from unilabos_msgs.msg import Resource +from unilabos.device_comms.modbus_plc.client import ModbusTcpClient +from unilabos.devices.workstation.workstation_base import WorkstationBase +from unilabos.device_comms.modbus_plc.client import TCPClient, ModbusNode, PLCWorkflow, ModbusWorkflow, WorkflowAction, BaseClient +from unilabos.device_comms.modbus_plc.modbus import DeviceType, Base as ModbusNodeBase, DataType, WorderOrder +from unilabos.devices.workstation.coin_cell_assembly.YB_YH_materials import * +from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, BaseROS2DeviceNode +from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode +from unilabos.devices.workstation.coin_cell_assembly.YB_YH_materials import CoincellDeck +from unilabos.resources.graphio import convert_resources_to_type +from unilabos.utils.log import logger + + +def _ensure_modbus_slave_kw_alias(modbus_client): + if modbus_client is None: + return + + method_names = [ + "read_coils", + "write_coils", + "write_coil", + "read_discrete_inputs", + "read_holding_registers", + "write_register", + "write_registers", + ] + + def _wrap(func): + signature = inspect.signature(func) + has_var_kwargs = any(param.kind == param.VAR_KEYWORD for param in signature.parameters.values()) + accepts_unit = has_var_kwargs or "unit" in signature.parameters + accepts_slave = has_var_kwargs or "slave" in signature.parameters + + @wraps(func) + def _wrapped(self, *args, **kwargs): + if "slave" in kwargs and not accepts_slave: + slave_value = kwargs.pop("slave") + if accepts_unit and "unit" not in kwargs: + kwargs["unit"] = slave_value + if "unit" in kwargs and not accepts_unit: + unit_value = kwargs.pop("unit") + if accepts_slave and "slave" not in kwargs: + kwargs["slave"] = unit_value + return func(self, *args, **kwargs) + + _wrapped._has_slave_alias = True + return _wrapped + + for name in method_names: + if not hasattr(modbus_client, name): + continue + bound_method = getattr(modbus_client, name) + func = getattr(bound_method, "__func__", None) + if func is None: + continue + if getattr(func, "_has_slave_alias", False): + continue + wrapped = _wrap(func) + setattr(modbus_client, name, types.MethodType(wrapped, modbus_client)) + + +def _coerce_deck_input(deck: Any) -> Optional[Deck]: + if deck is None: + return None + + if isinstance(deck, Deck): + return deck + + if isinstance(deck, PLRResource): + return deck if isinstance(deck, Deck) else None + + candidates = None + if isinstance(deck, dict): + if "nodes" in deck and isinstance(deck["nodes"], list): + candidates = deck["nodes"] + else: + candidates = [deck] + elif isinstance(deck, list): + candidates = deck + + if candidates is None: + return None + + try: + converted = convert_resources_to_type(resources_list=candidates, resource_type=Deck) + if isinstance(converted, Deck): + return converted + if isinstance(converted, list): + for item in converted: + if isinstance(item, Deck): + return item + except Exception as exc: + logger.warning(f"deck 转换 Deck 失败: {exc}") + return None + + +#构建物料系统 + +class CoinCellAssemblyWorkstation(WorkstationBase): + def __init__( + self, + deck: Deck=None, + address: str = "172.21.32.111", + port: str = "502", + debug_mode: bool = False, + *args, + **kwargs, + ): + if deck is None and "deck" in kwargs: + deck = kwargs.pop("deck") + else: + kwargs.pop("deck", None) + + normalized_deck = _coerce_deck_input(deck) + + if deck is None and isinstance(normalized_deck, Deck): + deck = normalized_deck + + super().__init__( + #桌子 + deck=deck, + *args, + **kwargs, + ) + self.debug_mode = debug_mode + + # 如果没有传入 deck,则创建标准配置的 deck + if self.deck is None: + self.deck = CoincellDeck(size_x=1000, size_y=1000, size_z=900, setup=True) + else: + # 如果传入了 deck 但还没有 setup,可以选择是否 setup + if self.deck is not None and len(self.deck.children) == 0: + # deck 为空,执行 setup + self.deck.setup() + # 否则使用传入的 deck(可能已经配置好了) + self.deck = self.deck + + """ 连接初始化 """ + modbus_client = TCPClient(addr=address, port=port) + print("modbus_client", modbus_client) + _ensure_modbus_slave_kw_alias(modbus_client.client) + if not debug_mode: + modbus_client.client.connect() + count = 100 + while count >0: + count -=1 + if modbus_client.client.is_socket_open(): + break + time.sleep(2) + if not modbus_client.client.is_socket_open(): + raise ValueError('modbus tcp connection failed') + else: + print("测试模式,跳过连接") + + """ 工站的配置 """ + self.nodes = BaseClient.load_csv(os.path.join(os.path.dirname(__file__), 'coin_cell_assembly_a.csv')) + self.client = modbus_client.register_node_list(self.nodes) + self.success = False + self.allow_data_read = False #允许读取函数运行标志位 + self.csv_export_thread = None + self.csv_export_running = False + self.csv_export_file = None + self.coin_num_N = 0 #已组装电池数量 + #创建一个物料台面,包含两个极片板 + #self._ros_node.update_resource(self.deck) + + #ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{ + # "resources": [self.deck] + #}) + + + def post_init(self, ros_node: ROS2WorkstationNode): + self._ros_node = ros_node + #self.deck = create_a_coin_cell_deck() + ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{ + "resources": [self.deck] + }) + + # 批量操作在这里写 + async def change_hole_sheet_to_2(self, hole: MaterialHole): + hole._unilabos_state["max_sheets"] = 2 + return await self._ros_node.update_resource(hole) + + + async def fill_plate(self): + plate_1: MaterialPlate = self.deck.children[0].children[0] + #plate_1 + return await self._ros_node.update_resource(plate_1) + + #def run_assembly(self, wf_name: str, resource: PLRResource, params: str = "\{\}"): + # """启动工作流""" + # self.current_workflow_status = WorkflowStatus.RUNNING + # logger.info(f"工作站 {self.device_id} 启动工作流: {wf_name}") +# + # # TODO: 实现工作流逻辑 +# + # anode_sheet = self.deck.get_resource("anode_sheet") + + """ Action逻辑代码 """ + def _sys_start_cmd(self, cmd=None): + """设备启动命令 (可读写)""" + if cmd is not None: # 写入模式 + self.success = False + node = self.client.use_node('COIL_SYS_START_CMD') + ret = node.write(cmd) + print(ret) + self.success = True + return self.success + else: # 读取模式 + cmd_feedback, read_err = self.client.use_node('COIL_SYS_START_CMD').read(1) + return cmd_feedback[0] + + def _sys_stop_cmd(self, cmd=None): + """设备停止命令 (可读写)""" + if cmd is not None: # 写入模式 + self.success = False + node = self.client.use_node('COIL_SYS_STOP_CMD') + node.write(cmd) + self.success = True + return self.success + else: # 读取模式 + cmd_feedback, read_err = self.client.use_node('COIL_SYS_STOP_CMD').read(1) + return cmd_feedback[0] + + def _sys_reset_cmd(self, cmd=None): + """设备复位命令 (可读写)""" + if cmd is not None: + self.success = False + self.client.use_node('COIL_SYS_RESET_CMD').write(cmd) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('COIL_SYS_RESET_CMD').read(1) + return cmd_feedback[0] + + def _sys_hand_cmd(self, cmd=None): + """手动模式命令 (可读写)""" + if cmd is not None: + self.success = False + self.client.use_node('COIL_SYS_HAND_CMD').write(cmd) + self.success = True + print("步骤0") + return self.success + else: + cmd_feedback, read_err = self.client.use_node('COIL_SYS_HAND_CMD').read(1) + return cmd_feedback[0] + + def _sys_auto_cmd(self, cmd=None): + """自动模式命令 (可读写)""" + if cmd is not None: + self.success = False + self.client.use_node('COIL_SYS_AUTO_CMD').write(cmd) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('COIL_SYS_AUTO_CMD').read(1) + return cmd_feedback[0] + + def _sys_init_cmd(self, cmd=None): + """初始化命令 (可读写)""" + if cmd is not None: + self.success = False + self.client.use_node('COIL_SYS_INIT_CMD').write(cmd) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('COIL_SYS_INIT_CMD').read(1) + return cmd_feedback[0] + + def _unilab_send_msg_succ_cmd(self, cmd=None): + """UNILAB发送配方完毕 (可读写)""" + if cmd is not None: + self.success = False + self.client.use_node('COIL_UNILAB_SEND_MSG_SUCC_CMD').write(cmd) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('COIL_UNILAB_SEND_MSG_SUCC_CMD').read(1) + return cmd_feedback[0] + + def _unilab_rec_msg_succ_cmd(self, cmd=None): + """UNILAB接收测试电池数据完毕 (可读写)""" + if cmd is not None: + self.success = False + self.client.use_node('COIL_UNILAB_REC_MSG_SUCC_CMD').write(cmd) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('COIL_UNILAB_REC_MSG_SUCC_CMD').read(1) + return cmd_feedback + + + # ====================== 命令类指令(REG_x_) ====================== + def _unilab_send_msg_electrolyte_num(self, num=None): + """UNILAB写电解液使用瓶数(可读写)""" + if num is not None: + self.success = False + ret = self.client.use_node('REG_MSG_ELECTROLYTE_NUM').write(num) + print(ret) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('REG_MSG_ELECTROLYTE_NUM').read(1) + return cmd_feedback[0] + + def _unilab_send_msg_electrolyte_use_num(self, use_num=None): + """UNILAB写单次电解液使用瓶数(可读写)""" + if use_num is not None: + self.success = False + self.client.use_node('REG_MSG_ELECTROLYTE_USE_NUM').write(use_num) + self.success = True + return self.success + else: + return False + + def _unilab_send_msg_assembly_type(self, num=None): + """UNILAB写组装参数""" + if num is not None: + self.success = False + self.client.use_node('REG_MSG_ASSEMBLY_TYPE').write(num) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('REG_MSG_ASSEMBLY_TYPE').read(1) + return cmd_feedback[0] + + def _unilab_send_msg_electrolyte_vol(self, vol=None): + """UNILAB写电解液吸取量参数""" + if vol is not None: + self.success = False + self.client.use_node('REG_MSG_ELECTROLYTE_VOLUME').write(vol, data_type=DataType.FLOAT32, word_order=WorderOrder.LITTLE) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('REG_MSG_ELECTROLYTE_VOLUME').read(2, word_order=WorderOrder.LITTLE) + return cmd_feedback[0] + + def _unilab_send_msg_assembly_pressure(self, vol=None): + """UNILAB写电池压制力""" + if vol is not None: + self.success = False + self.client.use_node('REG_MSG_ASSEMBLY_PRESSURE').write(vol, data_type=DataType.FLOAT32, word_order=WorderOrder.LITTLE) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('REG_MSG_ASSEMBLY_PRESSURE').read(2, word_order=WorderOrder.LITTLE) + return cmd_feedback[0] + + # ==================== 0905新增内容(COIL_x_STATUS) ==================== + def _unilab_send_electrolyte_bottle_num(self, num=None): + """UNILAB发送电解液瓶数完毕""" + if num is not None: + self.success = False + self.client.use_node('UNILAB_SEND_ELECTROLYTE_BOTTLE_NUM').write(num) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('UNILAB_SEND_ELECTROLYTE_BOTTLE_NUM').read(1) + return cmd_feedback[0] + + def _unilab_rece_electrolyte_bottle_num(self, num=None): + """设备请求接受电解液瓶数""" + if num is not None: + self.success = False + self.client.use_node('UNILAB_RECE_ELECTROLYTE_BOTTLE_NUM').write(num) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('UNILAB_RECE_ELECTROLYTE_BOTTLE_NUM').read(1) + return cmd_feedback[0] + + def _reg_msg_electrolyte_num(self, num=None): + """电解液已使用瓶数""" + if num is not None: + self.success = False + self.client.use_node('REG_MSG_ELECTROLYTE_NUM').write(num) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('REG_MSG_ELECTROLYTE_NUM').read(1) + return cmd_feedback[0] + + def _reg_data_electrolyte_use_num(self, num=None): + """单瓶电解液完成组装数""" + if num is not None: + self.success = False + self.client.use_node('REG_DATA_ELECTROLYTE_USE_NUM').write(num) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('REG_DATA_ELECTROLYTE_USE_NUM').read(1) + return cmd_feedback[0] + + def _unilab_send_finished_cmd(self, num=None): + """Unilab发送已知一组组装完成信号""" + if num is not None: + self.success = False + self.client.use_node('UNILAB_SEND_FINISHED_CMD').write(num) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('UNILAB_SEND_FINISHED_CMD').read(1) + return cmd_feedback[0] + + def _unilab_rece_finished_cmd(self, num=None): + """Unilab接收已知一组组装完成信号""" + if num is not None: + self.success = False + self.client.use_node('UNILAB_RECE_FINISHED_CMD').write(num) + self.success = True + return self.success + else: + cmd_feedback, read_err = self.client.use_node('UNILAB_RECE_FINISHED_CMD').read(1) + return cmd_feedback[0] + + + + # ==================== 状态类属性(COIL_x_STATUS) ==================== + def _sys_start_status(self) -> bool: + """设备启动中( BOOL)""" + status, read_err = self.client.use_node('COIL_SYS_START_STATUS').read(1) + return status[0] + + def _sys_stop_status(self) -> bool: + """设备停止中( BOOL)""" + status, read_err = self.client.use_node('COIL_SYS_STOP_STATUS').read(1) + return status[0] + + def _sys_reset_status(self) -> bool: + """设备复位中( BOOL)""" + status, read_err = self.client.use_node('COIL_SYS_RESET_STATUS').read(1) + return status[0] + + def _sys_init_status(self) -> bool: + """设备初始化完成( BOOL)""" + status, read_err = self.client.use_node('COIL_SYS_INIT_STATUS').read(1) + return status[0] + + # 查找资源 + def modify_deck_name(self, resource_name: str): + # figure_res = self._ros_node.resource_tracker.figure_resource({"name": resource_name}) + # print(f"!!! figure_res: {type(figure_res)}") + self.deck.children[1] + return + + @property + def sys_status(self) -> str: + if self.debug_mode: + return "设备调试模式" + if self._sys_start_status(): + return "设备启动中" + elif self._sys_stop_status(): + return "设备停止中" + elif self._sys_reset_status(): + return "设备复位中" + elif self._sys_init_status(): + return "设备初始化中" + else: + return "未知状态" + + def _sys_hand_status(self) -> bool: + """设备手动模式( BOOL)""" + status, read_err = self.client.use_node('COIL_SYS_HAND_STATUS').read(1) + return status[0] + + def _sys_auto_status(self) -> bool: + """设备自动模式( BOOL)""" + status, read_err = self.client.use_node('COIL_SYS_AUTO_STATUS').read(1) + return status[0] + + @property + def sys_mode(self) -> str: + if self.debug_mode: + return "设备调试模式" + if self._sys_hand_status(): + return "设备手动模式" + elif self._sys_auto_status(): + return "设备自动模式" + else: + return "未知模式" + + @property + def request_rec_msg_status(self) -> bool: + """设备请求接受配方( BOOL)""" + if self.debug_mode: + return True + status, read_err = self.client.use_node('COIL_REQUEST_REC_MSG_STATUS').read(1) + return status[0] + + @property + def request_send_msg_status(self) -> bool: + """设备请求发送测试数据( BOOL)""" + if self.debug_mode: + return True + status, read_err = self.client.use_node('COIL_REQUEST_SEND_MSG_STATUS').read(1) + return status[0] + + # ======================= 其他属性(特殊功能) ======================== + ''' + @property + def warning_1(self) -> bool: + status, read_err = self.client.use_node('COIL_WARNING_1').read(1) + return status[0] + ''' + # ===================== 生产数据区 ====================== + + @property + def data_assembly_coin_cell_num(self) -> int: + """已完成电池数量 (INT16)""" + if self.debug_mode: + return 0 + num, read_err = self.client.use_node('REG_DATA_ASSEMBLY_COIN_CELL_NUM').read(1) + return num + + @property + def data_assembly_time(self) -> float: + """单颗电池组装时间 (秒, REAL/FLOAT32)""" + if self.debug_mode: + return 0 + time, read_err = self.client.use_node('REG_DATA_ASSEMBLY_PER_TIME').read(2, word_order=WorderOrder.LITTLE) + return time + + @property + def data_open_circuit_voltage(self) -> float: + """开路电压值 (FLOAT32)""" + if self.debug_mode: + return 0 + vol, read_err = self.client.use_node('REG_DATA_OPEN_CIRCUIT_VOLTAGE').read(2, word_order=WorderOrder.LITTLE) + return vol + + @property + def data_axis_x_pos(self) -> float: + """分液X轴当前位置 (FLOAT32)""" + if self.debug_mode: + return 0 + pos, read_err = self.client.use_node('REG_DATA_AXIS_X_POS').read(2, word_order=WorderOrder.LITTLE) + return pos + + @property + def data_axis_y_pos(self) -> float: + """分液Y轴当前位置 (FLOAT32)""" + if self.debug_mode: + return 0 + pos, read_err = self.client.use_node('REG_DATA_AXIS_Y_POS').read(2, word_order=WorderOrder.LITTLE) + return pos + + @property + def data_axis_z_pos(self) -> float: + """分液Z轴当前位置 (FLOAT32)""" + if self.debug_mode: + return 0 + pos, read_err = self.client.use_node('REG_DATA_AXIS_Z_POS').read(2, word_order=WorderOrder.LITTLE) + return pos + + @property + def data_pole_weight(self) -> float: + """当前电池正极片称重数据 (FLOAT32)""" + if self.debug_mode: + return 0 + weight, read_err = self.client.use_node('REG_DATA_POLE_WEIGHT').read(2, word_order=WorderOrder.LITTLE) + return weight + + @property + def data_assembly_pressure(self) -> int: + """当前电池压制力 (INT16)""" + if self.debug_mode: + return 0 + pressure, read_err = self.client.use_node('REG_DATA_ASSEMBLY_PRESSURE').read(1) + return pressure + + @property + def data_electrolyte_volume(self) -> int: + """当前电解液加注量 (INT16)""" + if self.debug_mode: + return 0 + vol, read_err = self.client.use_node('REG_DATA_ELECTROLYTE_VOLUME').read(1) + return vol + + @property + def data_coin_num(self) -> int: + """当前电池数量 (INT16)""" + if self.debug_mode: + return 0 + num, read_err = self.client.use_node('REG_DATA_COIN_NUM').read(1) + return num + + @property + def data_coin_cell_code(self) -> str: + """电池二维码序列号 (STRING)""" + try: + # 尝试不同的字节序读取 + code_little, read_err = self.client.use_node('REG_DATA_COIN_CELL_CODE').read(10, word_order=WorderOrder.LITTLE) + print(code_little) + clean_code = code_little[-8:][::-1] + return clean_code + except Exception as e: + print(f"读取电池二维码失败: {e}") + return "N/A" + + + @property + def data_electrolyte_code(self) -> str: + try: + # 尝试不同的字节序读取 + code_little, read_err = self.client.use_node('REG_DATA_ELECTROLYTE_CODE').read(10, word_order=WorderOrder.LITTLE) + print(code_little) + clean_code = code_little[-8:][::-1] + return clean_code + except Exception as e: + print(f"读取电解液二维码失败: {e}") + return "N/A" + + # ===================== 环境监控区 ====================== + @property + def data_glove_box_pressure(self) -> float: + """手套箱压力 (bar, FLOAT32)""" + if self.debug_mode: + return 0 + status, read_err = self.client.use_node('REG_DATA_GLOVE_BOX_PRESSURE').read(2, word_order=WorderOrder.LITTLE) + return status + + @property + def data_glove_box_o2_content(self) -> float: + """手套箱氧含量 (ppm, FLOAT32)""" + if self.debug_mode: + return 0 + value, read_err = self.client.use_node('REG_DATA_GLOVE_BOX_O2_CONTENT').read(2, word_order=WorderOrder.LITTLE) + return value + + @property + def data_glove_box_water_content(self) -> float: + """手套箱水含量 (ppm, FLOAT32)""" + if self.debug_mode: + return 0 + value, read_err = self.client.use_node('REG_DATA_GLOVE_BOX_WATER_CONTENT').read(2, word_order=WorderOrder.LITTLE) + return value + +# @property +# def data_stack_vision_code(self) -> int: +# """物料堆叠复检图片编码 (INT16)""" +# if self.debug_mode: +# return 0 +# code, read_err = self.client.use_node('REG_DATA_STACK_VISON_CODE').read(1) +# #code, _ = self.client.use_node('REG_DATA_STACK_VISON_CODE').read(1).type +# print(f"读取物料堆叠复检图片编码", {code}, "error", type(code)) +# #print(code.type) +# # print(read_err) +# return int(code) + + def func_pack_device_init(self): + #切换手动模式 + print("切换手动模式") + self._sys_hand_cmd(True) + time.sleep(1) + while (self._sys_hand_status()) == False: + print("waiting for hand_cmd") + time.sleep(1) + #设备初始化 + self._sys_init_cmd(True) + time.sleep(1) + #sys_init_status为bool值,不加括号 + while (self._sys_init_status())== False: + print("waiting for init_cmd") + time.sleep(1) + #手动按钮置回False + self._sys_hand_cmd(False) + time.sleep(1) + while (self._sys_hand_cmd()) == True: + print("waiting for hand_cmd to False") + time.sleep(1) + #初始化命令置回False + self._sys_init_cmd(False) + time.sleep(1) + while (self._sys_init_cmd()) == True: + print("waiting for init_cmd to False") + time.sleep(1) + + def func_pack_device_auto(self): + #切换自动 + print("切换自动模式") + self._sys_auto_cmd(True) + time.sleep(1) + while (self._sys_auto_status()) == False: + print("waiting for auto_status") + time.sleep(1) + #自动按钮置False + self._sys_auto_cmd(False) + time.sleep(1) + while (self._sys_auto_cmd()) == True: + print("waiting for auto_cmd") + time.sleep(1) + + def func_pack_device_start(self): + #切换自动 + print("启动") + self._sys_start_cmd(True) + time.sleep(1) + while (self._sys_start_status()) == False: + print("waiting for start_status") + time.sleep(1) + #自动按钮置False + self._sys_start_cmd(False) + time.sleep(1) + while (self._sys_start_cmd()) == True: + print("waiting for start_cmd") + time.sleep(1) + + def func_pack_send_bottle_num(self, bottle_num): + bottle_num = int(bottle_num) + #发送电解液平台数 + print("启动") + while (self._unilab_rece_electrolyte_bottle_num()) == False: + print("waiting for rece_electrolyte_bottle_num to True") + # self.client.use_node('8520').write(True) + time.sleep(1) + #发送电解液瓶数为2 + self._reg_msg_electrolyte_num(bottle_num) + time.sleep(1) + #完成信号置True + self._unilab_send_electrolyte_bottle_num(True) + time.sleep(1) + #检测到依华已接收 + while (self._unilab_rece_electrolyte_bottle_num()) == True: + print("waiting for rece_electrolyte_bottle_num to False") + time.sleep(1) + #完成信号置False + self._unilab_send_electrolyte_bottle_num(False) + time.sleep(1) + #自动按钮置False + + + # 下发参数 + #def func_pack_send_msg_cmd(self, elec_num: int, elec_use_num: int, elec_vol: float, assembly_type: int, assembly_pressure: int) -> bool: + # """UNILAB写参数""" + # while (self.request_rec_msg_status) == False: + # print("wait for res_msg") + # time.sleep(1) + # self.success = False + # self._unilab_send_msg_electrolyte_num(elec_num) + # time.sleep(1) + # self._unilab_send_msg_electrolyte_use_num(elec_use_num) + # time.sleep(1) + # self._unilab_send_msg_electrolyte_vol(elec_vol) + # time.sleep(1) + # self._unilab_send_msg_assembly_type(assembly_type) + # time.sleep(1) + # self._unilab_send_msg_assembly_pressure(assembly_pressure) + # time.sleep(1) + # self._unilab_send_msg_succ_cmd(True) + # time.sleep(1) + # self._unilab_send_msg_succ_cmd(False) + # #将允许读取标志位置True + # self.allow_data_read = True + # self.success = True + # return self.success + + def func_pack_send_msg_cmd(self, elec_use_num, elec_vol, assembly_type, assembly_pressure) -> bool: + """UNILAB写参数""" + while (self.request_rec_msg_status) == False: + print("wait for request_rec_msg_status to True") + time.sleep(1) + self.success = False + #self._unilab_send_msg_electrolyte_num(elec_num) + #设置平行样数目 + self._unilab_send_msg_electrolyte_use_num(elec_use_num) + time.sleep(1) + #发送电解液加注量 + self._unilab_send_msg_electrolyte_vol(elec_vol) + time.sleep(1) + #发送电解液组装类型 + self._unilab_send_msg_assembly_type(assembly_type) + time.sleep(1) + #发送电池压制力 + self._unilab_send_msg_assembly_pressure(assembly_pressure) + time.sleep(1) + self._unilab_send_msg_succ_cmd(True) + time.sleep(1) + while (self.request_rec_msg_status) == True: + print("wait for request_rec_msg_status to False") + time.sleep(1) + self._unilab_send_msg_succ_cmd(False) + #将允许读取标志位置True + self.allow_data_read = True + self.success = True + return self.success + + def func_pack_get_msg_cmd(self, file_path: str="D:\\coin_cell_data") -> bool: + """UNILAB读参数""" + while self.request_send_msg_status == False: + print("waiting for send_read_msg_status to True") + time.sleep(1) + data_open_circuit_voltage = self.data_open_circuit_voltage + data_pole_weight = self.data_pole_weight + data_assembly_time = self.data_assembly_time + data_assembly_pressure = self.data_assembly_pressure + data_electrolyte_volume = self.data_electrolyte_volume + data_coin_num = self.data_coin_num + data_electrolyte_code = self.data_electrolyte_code + data_coin_cell_code = self.data_coin_cell_code + print("data_open_circuit_voltage", data_open_circuit_voltage) + print("data_pole_weight", data_pole_weight) + print("data_assembly_time", data_assembly_time) + print("data_assembly_pressure", data_assembly_pressure) + print("data_electrolyte_volume", data_electrolyte_volume) + print("data_coin_num", data_coin_num) + print("data_electrolyte_code", data_electrolyte_code) + print("data_coin_cell_code", data_coin_cell_code) + #接收完信息后,读取完毕标志位置True + liaopan3 = self.deck.get_resource("\u7535\u6c60\u6599\u76d8") + #把物料解绑后放到另一盘上 + battery = ElectrodeSheet(name=f"battery_{self.coin_num_N}", size_x=14, size_y=14, size_z=2) + battery._unilabos_state = { + "electrolyte_name": data_coin_cell_code, + "data_electrolyte_code": data_electrolyte_code, + "open_circuit_voltage": data_open_circuit_voltage, + "assembly_pressure": data_assembly_pressure, + "electrolyte_volume": data_electrolyte_volume + } + liaopan3.children[self.coin_num_N].assign_child_resource(battery, location=None) + #print(jipian2.parent) + ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{ + "resources": [self.deck] + }) + + + self._unilab_rec_msg_succ_cmd(True) + time.sleep(1) + #等待允许读取标志位置False + while self.request_send_msg_status == True: + print("waiting for send_msg_status to False") + time.sleep(1) + self._unilab_rec_msg_succ_cmd(False) + time.sleep(1) + #将允许读取标志位置True + time_date = datetime.now().strftime("%Y%m%d") + #秒级时间戳用于标记每一行电池数据 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + #生成输出文件的变量 + self.csv_export_file = os.path.join(file_path, f"date_{time_date}.csv") + #将数据存入csv文件 + if not os.path.exists(self.csv_export_file): + #创建一个表头 + with open(self.csv_export_file, 'w', newline='', encoding='utf-8') as csvfile: + writer = csv.writer(csvfile) + writer.writerow([ + 'Time', 'open_circuit_voltage', 'pole_weight', + 'assembly_time', 'assembly_pressure', 'electrolyte_volume', + 'coin_num', 'electrolyte_code', 'coin_cell_code' + ]) + #立刻写入磁盘 + csvfile.flush() + #开始追加电池信息 + with open(self.csv_export_file, 'a', newline='', encoding='utf-8') as csvfile: + writer = csv.writer(csvfile) + writer.writerow([ + timestamp, data_open_circuit_voltage, data_pole_weight, + data_assembly_time, data_assembly_pressure, data_electrolyte_volume, + data_coin_num, data_electrolyte_code, data_coin_cell_code + ]) + #立刻写入磁盘 + csvfile.flush() + self.success = True + return self.success + + + + def func_pack_send_finished_cmd(self) -> bool: + """UNILAB写参数""" + while (self._unilab_rece_finished_cmd()) == False: + print("wait for rece_finished_cmd to True") + time.sleep(1) + self.success = False + self._unilab_send_finished_cmd(True) + time.sleep(1) + while (self._unilab_rece_finished_cmd()) == True: + print("wait for rece_finished_cmd to False") + time.sleep(1) + self._unilab_send_finished_cmd(False) + #将允许读取标志位置True + self.success = True + return self.success + + + + def func_allpack_cmd(self, elec_num, elec_use_num, elec_vol:int=50, assembly_type:int=7, assembly_pressure:int=4200, file_path: str="D:\\coin_cell_data") -> bool: + elec_num, elec_use_num, elec_vol, assembly_type, assembly_pressure = int(elec_num), int(elec_use_num), int(elec_vol), int(assembly_type), int(assembly_pressure) + summary_csv_file = os.path.join(file_path, "duandian.csv") + # 如果断点文件存在,先读取之前的进度 + if os.path.exists(summary_csv_file): + read_status_flag = True + with open(summary_csv_file, 'r', newline='', encoding='utf-8') as csvfile: + reader = csv.reader(csvfile) + header = next(reader) # 跳过标题行 + data_row = next(reader) # 读取数据行 + if len(data_row) >= 2: + elec_num_r = int(data_row[0]) + elec_use_num_r = int(data_row[1]) + elec_num_N = int(data_row[2]) + elec_use_num_N = int(data_row[3]) + coin_num_N = int(data_row[4]) + if elec_num_r == elec_num and elec_use_num_r == elec_use_num: + print("断点文件与当前任务匹配,继续") + else: + print("断点文件中elec_num、elec_use_num与当前任务不匹配,请检查任务下发参数或修改断点文件") + return False + print(f"从断点文件读取进度: elec_num_N={elec_num_N}, elec_use_num_N={elec_use_num_N}, coin_num_N={coin_num_N}") + + else: + read_status_flag = False + print("未找到断点文件,从头开始") + elec_num_N = 0 + elec_use_num_N = 0 + coin_num_N = 0 + for i in range(20): + print(f"剩余电解液瓶数: {elec_num}, 已组装电池数: {elec_use_num}") + print(f"剩余电解液瓶数: {type(elec_num)}, 已组装电池数: {type(elec_use_num)}") + print(f"剩余电解液瓶数: {type(int(elec_num))}, 已组装电池数: {type(int(elec_use_num))}") + + #如果是第一次运行,则进行初始化、切换自动、启动, 如果是断点重启则跳过。 + if read_status_flag == False: + pass + #初始化 + #self.func_pack_device_init() + #切换自动 + #self.func_pack_device_auto() + #启动,小车收回 + #self.func_pack_device_start() + #发送电解液瓶数量,启动搬运,多搬运没事 + #self.func_pack_send_bottle_num(elec_num) + last_i = elec_num_N + last_j = elec_use_num_N + for i in range(last_i, elec_num): + print(f"开始第{last_i+i+1}瓶电解液的组装") + #第一个循环从上次断点继续,后续循环从0开始 + j_start = last_j if i == last_i else 0 + self.func_pack_send_msg_cmd(elec_use_num-j_start, elec_vol, assembly_type, assembly_pressure) + + for j in range(j_start, elec_use_num): + print(f"开始第{last_i+i+1}瓶电解液的第{j+j_start+1}个电池组装") + #读取电池组装数据并存入csv + self.func_pack_get_msg_cmd(file_path) + time.sleep(1) + # TODO:读完再将电池数加一还是进入循环就将电池数加一需要考虑 + + + + # 生成断点文件 + # 生成包含elec_num_N、coin_num_N、timestamp的CSV文件 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + with open(summary_csv_file, 'w', newline='', encoding='utf-8') as csvfile: + writer = csv.writer(csvfile) + writer.writerow(['elec_num','elec_use_num', 'elec_num_N', 'elec_use_num_N', 'coin_num_N', 'timestamp']) + writer.writerow([elec_num, elec_use_num, elec_num_N, elec_use_num_N, coin_num_N, timestamp]) + csvfile.flush() + coin_num_N += 1 + self.coin_num_N = coin_num_N + elec_use_num_N += 1 + elec_num_N += 1 + elec_use_num_N = 0 + + #循环正常结束,则删除断点文件 + os.remove(summary_csv_file) + #全部完成后等待依华发送完成信号 + self.func_pack_send_finished_cmd() + + + def func_pack_device_stop(self) -> bool: + """打包指令:设备停止""" + for i in range(3): + time.sleep(2) + print(f"输出{i}") + #print("_sys_hand_cmd", self._sys_hand_cmd()) + #time.sleep(1) + #print("_sys_hand_status", self._sys_hand_status()) + #time.sleep(1) + #print("_sys_init_cmd", self._sys_init_cmd()) + #time.sleep(1) + #print("_sys_init_status", self._sys_init_status()) + #time.sleep(1) + #print("_sys_auto_status", self._sys_auto_status()) + #time.sleep(1) + #print("data_axis_y_pos", self.data_axis_y_pos) + #time.sleep(1) + #self.success = False + #with open('action_device_stop.json', 'r', encoding='utf-8') as f: + # action_json = json.load(f) + #self.client.execute_procedure_from_json(action_json) + #self.success = True + #return self.success + + def fun_wuliao_test(self) -> bool: + #找到data_init中构建的2个物料盘 + liaopan3 = self.deck.get_resource("\u7535\u6c60\u6599\u76d8") + for i in range(16): + battery = ElectrodeSheet(name=f"battery_{i}", size_x=16, size_y=16, size_z=2) + battery._unilabos_state = { + "diameter": 20.0, + "height": 20.0, + "assembly_pressure": i, + "electrolyte_volume": 20.0, + "electrolyte_name": f"DP{i}" + } + liaopan3.children[i].assign_child_resource(battery, location=None) + + ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{ + "resources": [self.deck] + }) + # for i in range(40): + # print(f"fun_wuliao_test 运行结束{i}") + # time.sleep(1) + # time.sleep(40) + # 数据读取与输出 + def func_read_data_and_output(self, file_path: str="D:\\coin_cell_data"): + # 检查CSV导出是否正在运行,已运行则跳出,防止同时启动两个while循环 + if self.csv_export_running: + return False, "读取已在运行中" + + #若不存在该目录则创建 + if not os.path.exists(file_path): + os.makedirs(file_path) + print(f"创建目录: {file_path}") + + # 只要允许读取标志位为true,就持续运行该函数,直到触发停止条件 + while self.allow_data_read: + + #函数运行标志位,确保只同时启动一个导出函数 + self.csv_export_running = True + + #等待接收结果标志位置True + while self.request_send_msg_status == False: + print("waiting for send_msg_status to True") + time.sleep(1) + #日期时间戳用于按天存放csv文件 + time_date = datetime.now().strftime("%Y%m%d") + #秒级时间戳用于标记每一行电池数据 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + #生成输出文件的变量 + self.csv_export_file = os.path.join(file_path, f"date_{time_date}.csv") + + #接收信息 + data_open_circuit_voltage = self.data_open_circuit_voltage + data_pole_weight = self.data_pole_weight + data_assembly_time = self.data_assembly_time + data_assembly_pressure = self.data_assembly_pressure + data_electrolyte_volume = self.data_electrolyte_volume + data_coin_num = self.data_coin_num + data_electrolyte_code = self.data_electrolyte_code + data_coin_cell_code = self.data_coin_cell_code + # 电解液瓶位置 + elec_bottle_site = 2 + # 极片夹取位置(应当通过寄存器读光标) + Pos_elec_site = 0 + Al_elec_site = 0 + Gasket_site = 0 + + #接收完信息后,读取完毕标志位置True + self._unilab_rec_msg_succ_cmd()# = True + #等待允许读取标志位置False + while self.request_send_msg_status == True: + print("waiting for send_msg_status to False") + time.sleep(1) + self._unilab_rec_msg_succ_cmd()# = False + + #此处操作物料信息(如果中途报错停止,如何) + #报错怎么办(加个判断标志位,如果发生错误,则根据停止位置扣除物料) + #根据物料光标判断取哪个物料(人工摆盘,电解液瓶,移液枪头都有光标位置,寄存器读即可) + + #物料读取操作写在这里 + #在这里进行物料调取 + #转移物料瓶,elec_bottle_site对应第几瓶电解液(从依华寄存器读取) + # transfer_bottle(deck, elec_bottle_site) + # #找到电解液瓶的对象 + # electrolyte_rack = deck.get_resource("electrolyte_rack") + # pending_positions = electrolyte_rack.get_pending_positions()[elec_bottle_site] + # # TODO: 瓶子取液体操作需要加入 +# +# + # #找到压制工站对应的对象 + # battery_press_slot = deck.get_resource("battery_press_1") + # #创建一个新电池 + # test_battery = Battery( + # name=f"test_battery_{data_coin_num}", + # diameter=20.0, # 与压制槽直径匹配 + # height=3.0, # 电池高度 + # max_volume=100.0, # 100μL容量 + # barcode=data_coin_cell_code, # 电池条码 + # ) + # if battery_press_slot.has_battery(): + # return False, "压制工站已有电池,无法放置新电池" + # #在压制位放置电池 + # battery_press_slot.place_battery(test_battery) + # #从第一个子弹夹中取料 + # clip_magazine_1_hole = self.deck.get_resource("clip_magazine_1").get_item(Pos_elec_site) + # clip_magazine_2_hole = self.deck.get_resource("clip_magazine_2").get_item(Al_elec_site) + # clip_magazine_3_hole = self.deck.get_resource("clip_magazine_3").get_item(Gasket_site) + # + # if clip_magazine_1_hole.get_sheet_count() > 0: # 检查洞位是否有极片 + # electrode_sheet_1 = clip_magazine_1_hole.take_sheet() # 从洞位取出极片 + # test_battery.add_electrode_sheet(electrode_sheet_1) # 添加到电池中 + # print(f"已将极片 {electrode_sheet_1.name} 从子弹夹转移到电池") + # else: + # print("子弹夹洞位0没有极片") +# + # if clip_magazine_2_hole.get_sheet_count() > 0: # 检查洞位是否有极片 + # electrode_sheet_2 = clip_magazine_2_hole.take_sheet() # 从洞位取出极片 + # test_battery.add_electrode_sheet(electrode_sheet_2) # 添加到电池中 + # print(f"已将极片 {electrode_sheet_2.name} 从子弹夹转移到电池") + # else: + # print("子弹夹洞位0没有极片") +# + # if clip_magazine_3_hole.get_sheet_count() > 0: # 检查洞位是否有极片 + # electrode_sheet_3 = clip_magazine_3_hole.take_sheet() # 从洞位取出极片 + # test_battery.add_electrode_sheet(electrode_sheet_3) # 添加到电池中 + # print(f"已将极片 {electrode_sheet_3.name} 从子弹夹转移到电池") + # else: + # print("子弹夹洞位0没有极片") + # + # # TODO:#把电解液从瓶中取到电池夹子中 + # battery_site = deck.get_resource("battery_press_1") + # clip_magazine_battery = deck.get_resource("clip_magazine_battery") + # if battery_site.has_battery(): + # battery = battery_site.take_battery() #从压制槽取出电池 + # clip_magazine_battery.add_battery(battery) #从压制槽取出电池 +# +# +# +# + # # 保存配置到文件 + # self.deck.save("button_battery_station_layout.json", indent=2) + # print("\n台面配置已保存到: button_battery_station_layout.json") + # + # # 保存状态到文件 + # self.deck.save_state_to_file("button_battery_station_state.json", indent=2) + # print("台面状态已保存到: button_battery_station_state.json") + + + + + + + #将数据写入csv中 + #如当前目录下无同名文件则新建一个csv用于存放数据 + if not os.path.exists(self.csv_export_file): + #创建一个表头 + with open(self.csv_export_file, 'w', newline='', encoding='utf-8') as csvfile: + writer = csv.writer(csvfile) + writer.writerow([ + 'Time', 'open_circuit_voltage', 'pole_weight', + 'assembly_time', 'assembly_pressure', 'electrolyte_volume', + 'coin_num', 'electrolyte_code', 'coin_cell_code' + ]) + #立刻写入磁盘 + csvfile.flush() + #开始追加电池信息 + with open(self.csv_export_file, 'a', newline='', encoding='utf-8') as csvfile: + writer = csv.writer(csvfile) + writer.writerow([ + timestamp, data_open_circuit_voltage, data_pole_weight, + data_assembly_time, data_assembly_pressure, data_electrolyte_volume, + data_coin_num, data_electrolyte_code, data_coin_cell_code + ]) + #立刻写入磁盘 + csvfile.flush() + + # 只要不在自动模式运行中,就将允许标志位置False + if self.sys_auto_status == False or self.sys_start_status == False: + self.allow_data_read = False + self.csv_export_running = False + time.sleep(1) + + def func_stop_read_data(self): + """停止CSV导出""" + if not self.csv_export_running: + return False, "read data未在运行" + + self.csv_export_running = False + self.allow_data_read = False + + if self.csv_export_thread and self.csv_export_thread.is_alive(): + self.csv_export_thread.join(timeout=5) + + def func_get_csv_export_status(self): + """获取CSV导出状态""" + return { + 'allow_read': self.allow_data_read, + 'running': self.csv_export_running, + 'thread_alive': self.csv_export_thread.is_alive() if self.csv_export_thread else False + } + + + ''' + # ===================== 物料管理区 ====================== + @property + def data_material_inventory(self) -> int: + """主物料库存 (数量, INT16)""" + inventory, read_err = self.client.use_node('REG_DATA_MATERIAL_INVENTORY').read(1) + return inventory + + @property + def data_tips_inventory(self) -> int: + """移液枪头库存 (数量, INT16)""" + inventory, read_err = self.client.register_node_list(self.nodes).use_node('REG_DATA_TIPS_INVENTORY').read(1) + return inventory + + ''' + + +if __name__ == "__main__": + # 简单测试 + workstation = CoinCellAssemblyWorkstation() + print(f"工作站创建成功: {workstation.deck.name}") + print(f"料盘数量: {len(workstation.deck.children)}")