mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-19 22:11:20 +00:00
Ready for open source (#47)
* Create app/main API * create example device * create ROS backend and example device SDK Wrapper * Add ROS host and host starting from app.py * Add gripper device and mock implementation * add "status_types" & "action_types" to ROS device decorator * add ActionServer debug example * [bugfix] complete mock gripper example * ROS Backend Host for Device action calling and Resource management * add conda/mamba ENV file * add host_node communication with app/main.py * add action message value mappings and converters * Update ilabos.yaml * Update issue templates * example devices.json and resources.json * Fix Device wrapper to use async property and actions (#7) * Fix Device wrapper to use async property and actions * Resolve #1 : support async get methods and actions. Give new examples. * add both sync/async GRBL controller SDK * 2 call device actions from appmainpy api to ros hostpy (#8) * feature: add job * fix:node start * feature:add get job status * fix:get device * clean * Resolve #5 device connection diagram and workflow compilation support (#9) * add syringe pump device and its compilation using device connection diagram * add RunzeSyringePump real device with ROS2 example * Prototype machine with 1 pump and 1 CNC * add ROS2ProtocolNode and related functions * add ilabos_msgs (to use PumpTransfer action) * add example device connection graph * refactor protocol_node code into separate file * add ROS2SerialNode * add SerialCommand srv in ilabos_msgs * add pump_protocol example, and fix bugs * [fix] serial service: avoid async service deadlock by directly call serial `send_command` * use SendCmd instead of SingleJointPosition for valve control * initialize device connection graph when server starts * Fix #5: async workflow execution (#16) * add rclpyx and protocol example for async-native workflow * use async in ROS2ProtocolNode, and host initialization * add examples of "ros-async" protocol implementation, and `run_in_event_loop` for using native async functions * use "ros-async" in protocols and device nodes * fix pump_protocol: push to 0.0 μL * Envs, docs, and conda recipes (#19) * update ENV: use python 3.11 and deprecate ros-humble-gazebo-ros * add ilabos-msgs conda recipe * Update ilabos.yaml * fix recipe and env yaml * Add sphinx docs * add aichemeco * add bioyong * add bioyong * Support XDL devices & protocols (#20) * [Feature] support multiple protocols in a single ProtocolNode * add Junjie's code * Support "Clean" protocol * Update Grignard_flow_batchreact_single_pumpvalve.json * test_grignard_add * add stir device node and example * Update device_node.py add print_publish flag to control the node's log output * NH4Cl_add * add "HeaterStirrer" device and "HeatChill" action * add wait time after each pump action for equilibration * fix stir * add Separate protocol * Refactor Separator device and Stir action * add rotavap_node * fix stir * add chiller node * Move rinsings into PumpTransfer * Fix SeparateProtocol under refactored Separator device and Stir action * Supports automatically add new protocol action_types * fix PumpTransfer protocol because of rinsing * Add Rotavap and Chiller devices * fix SeparateProtocol * add EvaporateProtocol * add rotavap devices config * fix HeaterStirrer and SeparatorController IO * Fix automatically add new protocol action_types * Add HeaterStirrer and SeparatorController device config * fix pump protocols * Fix Evaporate action * Update evaporate_protocol.py * add temp_sensor node and add function remap * update docs --------- Co-authored-by: 王俊杰 <1800011822@pku.edu.cn> Co-authored-by: q434343 <554662886@qq.com> * fix aichemeco * [Bugfix] fix Windows conda packaging * add file upload api * update dependencies: force to use 3.11 and remove conflict in WIN64 and OSX64 * update dependencies: force to use 3.11 and remove conflict in WIN64 and OSX64 * Create aichemeco_simple.py * fix * update * add aichemeco file * MQTT [1/2]: action start (#25) * add mq * fix * clean * add class * fix excel * update bioyong * add api * fix --------- Co-authored-by: caok@dp.tech <xiaoyeqiannian@163.com> * motor & grasp * Add Grasp motor support and enhance EleGripper class - Introduced a new motor configuration for Grasp in sjtu.json. - Updated EleGripper class to inherit from UniversalDriver and added status property. - Implemented move_and_rotate method for coordinated movement. - Adjusted threading logic in EleGripper initialization. - Registered Grasp motor in ROS2 device node configuration. This commit enhances the motor control capabilities by integrating the Grasp motor and improving the existing EleGripper functionality. * fix read data lenth * Update Grasp.py * MQTT (2/2): publish Device Status, Action Feedback & Results (#27) * Add bridges in HostNode for device_status publishing * Add "bridges" selection (fastapi & mqtt) when app start * add MQ feedback & result publisher, and fix message converter * fix UUID converting between ROS and MQ * lint api model.py * Continuous controllers: PID, MPC, custom controllers etc. (#23) * add controller config & wrapper * add controller setup at app.main * control loop example * fix com port * add agv , ur_arm and raman * MQTT (3/4): Unified Resources and Sync when starting the server (#28) * update http upload api * generate uuid when init device * example resource json * fix * add new example full-content json (device, resource, graph) * fix full-content json and related reading code * fix * add json_schema when initialize resources * fix * update schema * refactor heaterstirrer.dalong * fix * fix refactor heaterstirrer.dalong * refactor syringepump.runze: use ml instead of μL * Update ilabos/ros/host.py Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> --------- Co-authored-by: 王俊杰 <1800011822@pku.edu.cn> Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> * Distributed initialization with self-organizing network (#29) * add distributed launching option "--without_host" * fix --------- Co-authored-by: 王俊杰 <1800011822@pku.edu.cn> * Refactor Workstation: Add resource service and tracking (#30) * move ilabos/ros/rpc to ilabos/device_comms/rpc, and merge bioyond/aichemeco files under /devices * add Resource srv and message_converter * move graphio to ilabos/resources * refactor resources type conversion * add resource clients in device_node * add mock resources service * pass Gripper1 resource test * update http resource services * add AGV compile function * add AGV transfer protocol * update full mock_gripper edit_id example * update full mock_gripper edit_id example * get and update resource also in protocol_node * mock resource update in AichemecoHiwo * Create HT_hiwo.json * add children in resources * bugfixes * fix rpc * add Revvity winprep --------- Co-authored-by: wjjxxx <43375851+wjjxxx@users.noreply.github.com> Co-authored-by: 3218923350 <105201755+3218923350@users.noreply.github.com> * Distributed launch (2/2): distributed resource create (#32) * add resource_add request to host for slave mode * add AGV * fix protocol resources * optimize host callbacks * bugfixes * add revvity registry --------- Co-authored-by: 王俊杰 <1800011822@pku.edu.cn> Co-authored-by: wjjxxx <43375851+wjjxxx@users.noreply.github.com> * Refactor Driver Files Structure (#33) * Integration with pywinauto & recorder Added execute run and initialize procdure * 酶标仪状态检测、使用示例,整体流程待测试 * nivo ready version * Add HPLC driver and example script - Introduced HPLCDriver class for managing HPLC device status and operations. - Implemented device status monitoring and command execution via ROS2 actions. - Added example script (hplc.py) demonstrating how to run commands on the HPLC device. - Created PlayerUtil and UniversalDriver classes for shared functionality across devices. - Refactored NivoDriver to utilize the new UniversalDriver structure. - Enhanced error handling and process management in the NivoDriver. * 修复start的错误定位 * hplc tested ok * relative path to build msgs * template_driver & jiageng devices * fetch correct status type and action type * fix mtype fetch * gpc bus integration * ilab build * remove chs * recipe rename * modbus update 1 * json available * hplc & modbus rewrite * Update AgilentHPLC.py hplc datafile reader * move ilabos/ros/rpc to ilabos/device_comms/rpc, and merge bioyond/aichemeco files under /devices * modbus分设备 * gpc * gpc 2 * fix address * default register node * fix MainScreenGPC * add Resource srv and message_converter * move graphio to ilabos/resources * refactor resources type conversion * add resource clients in device_node * add mock resources service * pass Gripper1 resource test * update http resource services * add AGV compile function * add AGV transfer protocol * update recipe.yaml * update full mock_gripper edit_id example * update full mock_gripper edit_id example * get and update resource also in protocol_node * mock resource update in AichemecoHiwo * feat: add other jiageng PLC device code * ilabos compile * correct format * correct recipe format * correct setup.py format * remove unnecessary files * remove unnecessary files * Create HT_hiwo.json * add children in resources * hplc support sample_id * correct hplc sample_id * correct hplc sample_id * hplc upload * fix type hint * oss upload tested ver * recipe yaml fix for linux * update installation yaml * refactor: moved all driver files according to its feat * merge main to dev --------- Co-authored-by: 王俊杰 <2201110460@stu.pku.edu.cn> Co-authored-by: Junhan Chang <changjh@pku.edu.cn> Co-authored-by: jiawei <miaojiawei@dp.tech> * add: NMR LH and RU device control (#34) * Add Registry for device drivers and Support GraphML (#35) * read chemputer graphml * read graphml in app/main * add devices in ros/devices * add schema for devices * read registry directory and initialize when entry from main * Delete devices.py * Update add_protocol.md * delete unecessary files * feat: 2278 devices registry yaml (#36) * read chemputer graphml * read graphml in app/main * add devices in ros/devices * add schema for devices * read registry directory and initialize when entry from main * Delete devices.py * add: NMR LH and RU device control * fix: modify jiageng devices registry --------- Co-authored-by: Junhan Chang <changjh@pku.edu.cn> * Device/Resource Registry and GraphML support (#37) * add resource type conversion to PLR * add resource registry and test * add docs * fix registry * add solenoid_valve_mock, its registry and test * fix registry for directly using examples * add EvacuateAndRefillProtocol and testcases * allow function sequence call in ACTION * add read & write & extra_info for hardware_interface * Update device_node.py * add solenoid valve * add doc developer guide yaml * fixes for starting IK station * add graphml grouping parser * fix graphml grouping parser * add communication edge parser * fix io solenoid valve * Update .gitignore * Update plates.yaml --------- Co-authored-by: ColumbiaCC <2100011801@stu.pku.edu.cn> * Uni-Lab Doc v0.2 (#39) * add Uni-Lab docs * change doc name * Dev (#41) * Integration with pywinauto & recorder Added execute run and initialize procdure * 酶标仪状态检测、使用示例,整体流程待测试 * nivo ready version * Add HPLC driver and example script - Introduced HPLCDriver class for managing HPLC device status and operations. - Implemented device status monitoring and command execution via ROS2 actions. - Added example script (hplc.py) demonstrating how to run commands on the HPLC device. - Created PlayerUtil and UniversalDriver classes for shared functionality across devices. - Refactored NivoDriver to utilize the new UniversalDriver structure. - Enhanced error handling and process management in the NivoDriver. * 修复start的错误定位 * hplc tested ok * relative path to build msgs * template_driver & jiageng devices * fetch correct status type and action type * fix mtype fetch * gpc bus integration * ilab build * remove chs * recipe rename * modbus update 1 * json available * hplc & modbus rewrite * Update AgilentHPLC.py hplc datafile reader * move ilabos/ros/rpc to ilabos/device_comms/rpc, and merge bioyond/aichemeco files under /devices * modbus分设备 * gpc * gpc 2 * fix address * default register node * fix MainScreenGPC * add Resource srv and message_converter * move graphio to ilabos/resources * refactor resources type conversion * add resource clients in device_node * add mock resources service * pass Gripper1 resource test * update http resource services * add AGV compile function * add AGV transfer protocol * update recipe.yaml * update full mock_gripper edit_id example * update full mock_gripper edit_id example * get and update resource also in protocol_node * mock resource update in AichemecoHiwo * feat: add other jiageng PLC device code * ilabos compile * correct format * correct recipe format * correct setup.py format * remove unnecessary files * remove unnecessary files * Create HT_hiwo.json * add children in resources * hplc support sample_id * correct hplc sample_id * correct hplc sample_id * hplc upload * fix type hint * oss upload tested ver * recipe yaml fix for linux * update installation yaml * refactor: moved all driver files according to its feat * merge main to dev * add HPLC registry and json * 升级 ros2-distro-mutex 依赖版本至 0.6 * 修改 ros2-distro-mutex 依赖版本为通配符匹配 * 更新 ros-humble-ilabos-msgs 依赖为 robostack-humble 命名空间 * add resource type conversion to PLR * add resource registry and test * feat: 更新oss上传 * fix device id * add docs * fix registry * add solenoid_valve_mock, its registry and test * fix registry for directly using examples * add EvacuateAndRefillProtocol and testcases * allow function sequence call in ACTION * add read & write & extra_info for hardware_interface * Update device_node.py * add solenoid valve * add doc developer guide yaml * use robostack-staging * rclpy version test * lower rclpy * ensure 0.6* env * fixes for starting IK station * add graphml grouping parser * fix graphml grouping parser * add communication edge parser * fix io solenoid valve * Update .gitignore * Update plates.yaml * Feature/device node later init (#40) * 修改config路径,方便后续打包 增加device_node打印 * 支持plr序列化/init创建 * 统一命名 * import mgr logger optimize banner print * 日志OK * fix unicorn frame * banner print * correct import format * file path changes * 取消后补全,在加载设备的时候直接替换 * converter update * web page update * 在线device更新,node继承替换 * 修复动作、状态的类型缺失 和 命令提示 * web功能实现结束 * host节点更改完成 新增status时间戳管理 新增每10s动态发现其他node * ros2类型的节点也应该被包一次 * 修复类型提示 * websocket 动态显示状态 * add workflow & book theme for docs * add workflow & book theme for docs * fix workflow build * fix workflow build * 理清启动关系 * stm32 example * mac . name * device_instance device_cls * 新增config添加方式 更新mqtt提示 * plr test * 移动is_host_mode 新增slave_no_host * 确保config优先修改生效 * fix graph io * 支持带参数传入 * 支持物料解析 * 支持物料解析 * device为空的时候不进行绑定或初始化 * protocol node new * protocol node runnable * protocol node runnable --------- Co-authored-by: 王俊杰 <2201110460@stu.pku.edu.cn> Co-authored-by: Junhan Chang <changjh@pku.edu.cn> Co-authored-by: jiawei <miaojiawei@dp.tech> Co-authored-by: ColumbiaCC <2100011801@stu.pku.edu.cn> * Dev (#45) * Integration with pywinauto & recorder Added execute run and initialize procdure * 酶标仪状态检测、使用示例,整体流程待测试 * nivo ready version * Add HPLC driver and example script - Introduced HPLCDriver class for managing HPLC device status and operations. - Implemented device status monitoring and command execution via ROS2 actions. - Added example script (hplc.py) demonstrating how to run commands on the HPLC device. - Created PlayerUtil and UniversalDriver classes for shared functionality across devices. - Refactored NivoDriver to utilize the new UniversalDriver structure. - Enhanced error handling and process management in the NivoDriver. * 修复start的错误定位 * hplc tested ok * relative path to build msgs * template_driver & jiageng devices * fetch correct status type and action type * fix mtype fetch * gpc bus integration * ilab build * remove chs * recipe rename * modbus update 1 * json available * hplc & modbus rewrite * Update AgilentHPLC.py hplc datafile reader * move ilabos/ros/rpc to ilabos/device_comms/rpc, and merge bioyond/aichemeco files under /devices * modbus分设备 * gpc * gpc 2 * fix address * default register node * fix MainScreenGPC * add Resource srv and message_converter * move graphio to ilabos/resources * refactor resources type conversion * add resource clients in device_node * add mock resources service * pass Gripper1 resource test * update http resource services * add AGV compile function * add AGV transfer protocol * update recipe.yaml * update full mock_gripper edit_id example * update full mock_gripper edit_id example * get and update resource also in protocol_node * mock resource update in AichemecoHiwo * feat: add other jiageng PLC device code * ilabos compile * correct format * correct recipe format * correct setup.py format * remove unnecessary files * remove unnecessary files * Create HT_hiwo.json * add children in resources * hplc support sample_id * correct hplc sample_id * correct hplc sample_id * hplc upload * fix type hint * oss upload tested ver * recipe yaml fix for linux * update installation yaml * refactor: moved all driver files according to its feat * merge main to dev * add HPLC registry and json * 升级 ros2-distro-mutex 依赖版本至 0.6 * 修改 ros2-distro-mutex 依赖版本为通配符匹配 * 更新 ros-humble-ilabos-msgs 依赖为 robostack-humble 命名空间 * add resource type conversion to PLR * add resource registry and test * feat: 更新oss上传 * fix device id * add docs * fix registry * add solenoid_valve_mock, its registry and test * fix registry for directly using examples * add EvacuateAndRefillProtocol and testcases * allow function sequence call in ACTION * add read & write & extra_info for hardware_interface * Update device_node.py * add solenoid valve * add doc developer guide yaml * use robostack-staging * rclpy version test * lower rclpy * ensure 0.6* env * fixes for starting IK station * add graphml grouping parser * fix graphml grouping parser * add communication edge parser * fix io solenoid valve * Update .gitignore * Update plates.yaml * Feature/device node later init (#40) * 修改config路径,方便后续打包 增加device_node打印 * 支持plr序列化/init创建 * 统一命名 * import mgr logger optimize banner print * 日志OK * fix unicorn frame * banner print * correct import format * file path changes * 取消后补全,在加载设备的时候直接替换 * converter update * web page update * 在线device更新,node继承替换 * 修复动作、状态的类型缺失 和 命令提示 * web功能实现结束 * host节点更改完成 新增status时间戳管理 新增每10s动态发现其他node * ros2类型的节点也应该被包一次 * 修复类型提示 * websocket 动态显示状态 * add workflow & book theme for docs * add workflow & book theme for docs * fix workflow build * fix workflow build * 理清启动关系 * stm32 example * mac . name * device_instance device_cls * 新增config添加方式 更新mqtt提示 * plr test * 移动is_host_mode 新增slave_no_host * 确保config优先修改生效 * fix graph io * 支持带参数传入 * 支持物料解析 * 支持物料解析 * device为空的时候不进行绑定或初始化 * protocol node new * protocol node runnable * protocol node runnable * Feature/device node later init (#42) * 修改config路径,方便后续打包 增加device_node打印 * 支持plr序列化/init创建 * 统一命名 * import mgr logger optimize banner print * 日志OK * fix unicorn frame * banner print * correct import format * file path changes * 取消后补全,在加载设备的时候直接替换 * converter update * web page update * 在线device更新,node继承替换 * 修复动作、状态的类型缺失 和 命令提示 * web功能实现结束 * host节点更改完成 新增status时间戳管理 新增每10s动态发现其他node * ros2类型的节点也应该被包一次 * 修复类型提示 * websocket 动态显示状态 * add workflow & book theme for docs * add workflow & book theme for docs * fix workflow build * fix workflow build * 理清启动关系 * stm32 example * mac . name * device_instance device_cls * 新增config添加方式 更新mqtt提示 * plr test * 移动is_host_mode 新增slave_no_host * 确保config优先修改生效 * fix graph io * 支持带参数传入 * 支持物料解析 * 支持物料解析 * device为空的时候不进行绑定或初始化 * protocol node new * protocol node runnable * protocol node runnable * action * plr suc * plr suc!! * plr suc!! * plr suc!! * plr msgs * Feature/device node later init (#43) * 修改config路径,方便后续打包 增加device_node打印 * 支持plr序列化/init创建 * 统一命名 * import mgr logger optimize banner print * 日志OK * fix unicorn frame * banner print * correct import format * file path changes * 取消后补全,在加载设备的时候直接替换 * converter update * web page update * 在线device更新,node继承替换 * 修复动作、状态的类型缺失 和 命令提示 * web功能实现结束 * host节点更改完成 新增status时间戳管理 新增每10s动态发现其他node * ros2类型的节点也应该被包一次 * 修复类型提示 * websocket 动态显示状态 * add workflow & book theme for docs * add workflow & book theme for docs * fix workflow build * fix workflow build * 理清启动关系 * stm32 example * mac . name * device_instance device_cls * 新增config添加方式 更新mqtt提示 * plr test * 移动is_host_mode 新增slave_no_host * 确保config优先修改生效 * fix graph io * 支持带参数传入 * 支持物料解析 * 支持物料解析 * device为空的时候不进行绑定或初始化 * protocol node new * protocol node runnable * protocol node runnable * action * plr suc * plr suc!! * plr suc!! * plr suc!! * plr msgs * plr * action * plr reg fix * Feature/device node later init (#44) * 修改config路径,方便后续打包 增加device_node打印 * 支持plr序列化/init创建 * 统一命名 * import mgr logger optimize banner print * 日志OK * fix unicorn frame * banner print * correct import format * file path changes * 取消后补全,在加载设备的时候直接替换 * converter update * web page update * 在线device更新,node继承替换 * 修复动作、状态的类型缺失 和 命令提示 * web功能实现结束 * host节点更改完成 新增status时间戳管理 新增每10s动态发现其他node * ros2类型的节点也应该被包一次 * 修复类型提示 * websocket 动态显示状态 * add workflow & book theme for docs * add workflow & book theme for docs * fix workflow build * fix workflow build * 理清启动关系 * stm32 example * mac . name * device_instance device_cls * 新增config添加方式 更新mqtt提示 * plr test * 移动is_host_mode 新增slave_no_host * 确保config优先修改生效 * fix graph io * 支持带参数传入 * 支持物料解析 * 支持物料解析 * device为空的时候不进行绑定或初始化 * protocol node new * protocol node runnable * protocol node runnable * action * plr suc * plr suc!! * plr suc!! * plr suc!! * plr msgs * plr * fix convert error fix async logic error added async error print * new device test * test resource add * test resource add * test resource add * test resource add * local env setup * node type fix temp fix root_node error fix convert res from type error * resource tracker * fix bug from qhh * fix bug from qhh * fix bug from qhh * fix bug from qhh * refactor MQTT client logging and connection handling; update group ID in config * driver_params allow empty * allow other init param * fix driver param and enhance type hint * refactor MQConfig to use double quotes for string literals * fix wrong function calling * fix wrong function calling * fix log for mac * fix networkx compatibility * add mqtt loggers * add action to jsonschema converter * random client id * type converter & registry * correct conversion * fix action publish only from discovered devices * add "Bio" tag for action doc generation * 改进module提示 * Fix doc * mqtt不连接也可用 性价样例提示 * add docs * 更新plr test案例 * Update intro.md * 更新有机案例 * skip --------- Co-authored-by: Harvey Que <Q-Query@outlook.com> Co-authored-by: Junhan Chang <1700011741@pku.edu.cn> --------- Co-authored-by: 王俊杰 <2201110460@stu.pku.edu.cn> Co-authored-by: Junhan Chang <changjh@pku.edu.cn> Co-authored-by: jiawei <miaojiawei@dp.tech> Co-authored-by: ColumbiaCC <2100011801@stu.pku.edu.cn> Co-authored-by: Harvey Que <Q-Query@outlook.com> Co-authored-by: Junhan Chang <1700011741@pku.edu.cn> * Canonicalize before Open Source (#46) * big big refactor try01 * refactor 02 --------- Co-authored-by: ck <xiaoyeqiannian@163.com> Co-authored-by: 王俊杰 <1800011822@pku.edu.cn> Co-authored-by: q434343 <554662886@qq.com> Co-authored-by: Xuwznln <xuwznln@gmail.com> Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> Co-authored-by: wjjxxx <43375851+wjjxxx@users.noreply.github.com> Co-authored-by: 3218923350 <105201755+3218923350@users.noreply.github.com> Co-authored-by: Xuwznln <1023025701@qq.com> Co-authored-by: 王俊杰 <2201110460@stu.pku.edu.cn> Co-authored-by: jiawei <miaojiawei@dp.tech> Co-authored-by: Jiawei <91898272+jiawei723@users.noreply.github.com> Co-authored-by: ColumbiaCC <2100011801@stu.pku.edu.cn> Co-authored-by: Harvey Que <Q-Query@outlook.com>
This commit is contained in:
4
unilabos/devices/motor/Consts.py
Normal file
4
unilabos/devices/motor/Consts.py
Normal file
@@ -0,0 +1,4 @@
|
||||
class Config(object):
|
||||
DEBUG_MODE = True
|
||||
OPEN_DEVICE = True
|
||||
DEVICE_ADDRESS = 0x01
|
||||
21
unilabos/devices/motor/FakeSerial.py
Normal file
21
unilabos/devices/motor/FakeSerial.py
Normal file
@@ -0,0 +1,21 @@
|
||||
class FakeSerial:
|
||||
def __init__(self):
|
||||
self.data = b''
|
||||
|
||||
def write(self, data):
|
||||
print("发送数据: ", end="")
|
||||
for i in data:
|
||||
print(f"{i:02x}", end=" ")
|
||||
print() # 换行
|
||||
# 这里可模拟把假数据写到某个内部缓存
|
||||
# self.data = ...
|
||||
|
||||
def setRTS(self, b):
|
||||
pass
|
||||
|
||||
def read(self, n):
|
||||
# 这里可返回预设的响应,例如 b'\x01\x03\x02\x00\x19\x79\x8E'
|
||||
return b'\x01\x03\x02\x00\x19\x79\x8E'
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
153
unilabos/devices/motor/Grasp.py
Normal file
153
unilabos/devices/motor/Grasp.py
Normal file
@@ -0,0 +1,153 @@
|
||||
import time
|
||||
from serial import Serial
|
||||
from threading import Thread
|
||||
|
||||
from unilabos.device_comms.universal_driver import UniversalDriver
|
||||
|
||||
class EleGripper(UniversalDriver):
|
||||
@property
|
||||
def status(self) -> str:
|
||||
return f"spin_pos: {self.rot_data[0]}, grasp_pos: {self.gri_data[0]}, spin_v: {self.rot_data[1]}, grasp_v: {self.gri_data[1]}, spin_F: {self.rot_data[2]}, grasp_F: {self.gri_data[2]}"
|
||||
|
||||
def __init__(self,port,baudrate=115200, pos_error=-11, id = 9):
|
||||
self.ser = Serial(port,baudrate)
|
||||
self.pos_error = pos_error
|
||||
|
||||
self.success = False
|
||||
|
||||
# [pos, speed, force]
|
||||
self.gri_data = [0,0,0]
|
||||
self.rot_data = [0,0,0]
|
||||
|
||||
self.id = id
|
||||
|
||||
self.init_gripper()
|
||||
self.wait_for_gripper_init()
|
||||
|
||||
t = Thread(target=self.data_loop)
|
||||
t.start()
|
||||
|
||||
self.gripper_move(0,255,255)
|
||||
self.rotate_move_abs(0,255,255)
|
||||
|
||||
def modbus_crc(self, data: bytes) -> bytes:
|
||||
crc = 0xFFFF
|
||||
for pos in data:
|
||||
crc ^= pos
|
||||
for _ in range(8):
|
||||
if (crc & 0x0001) != 0:
|
||||
crc >>= 1
|
||||
crc ^= 0xA001
|
||||
else:
|
||||
crc >>= 1
|
||||
return crc.to_bytes(2, byteorder='little')
|
||||
|
||||
def send_cmd(self, id, fun, address, data:str):
|
||||
data_len = len(bytes.fromhex(data))
|
||||
data_ = f"{id:02X} {fun} {address} {data_len//2:04X} {data_len:02X} {data}"
|
||||
data_bytes = bytes.fromhex(data_)
|
||||
data_with_checksum = data_bytes + self.modbus_crc(data_bytes)
|
||||
self.ser.write(data_with_checksum)
|
||||
time.sleep(0.01)
|
||||
self.ser.read(8)
|
||||
|
||||
def read_address(self, id , address, data_len):
|
||||
data = f"{id:02X} 03 {address} {data_len:04X}"
|
||||
data_bytes = bytes.fromhex(data)
|
||||
data_with_checksum = data_bytes + self.modbus_crc(data_bytes)
|
||||
self.ser.write(data_with_checksum)
|
||||
time.sleep(0.01)
|
||||
res_len = 5+data_len*2
|
||||
res = self.ser.read(res_len)
|
||||
return res
|
||||
|
||||
def init_gripper(self):
|
||||
self.send_cmd(self.id,'10','03 E8','00 01')
|
||||
self.send_cmd(self.id,'10','03 E9','00 01')
|
||||
|
||||
def gripper_move(self, pos, speed, force):
|
||||
self.send_cmd(self.id,'10', '03 EA', f"{speed:02x} {pos:02x} {force:02x} 01")
|
||||
|
||||
def rotate_move_abs(self, pos, speed, force):
|
||||
pos += self.pos_error
|
||||
if pos < 0:
|
||||
pos = (1 << 16) + pos
|
||||
self.send_cmd(self.id,'10', '03 EC', f"{(pos):04x} {force:02x} {speed:02x} 0000 00 01")
|
||||
|
||||
# def rotate_move_rel(self, pos, speed, force):
|
||||
# if pos < 0:
|
||||
# pos = (1 << 16) + pos
|
||||
# print(f'{pos:04x}')
|
||||
# self.send_cmd(self.id,'10', '03 EC', f"0000 {force:02x} {speed:02x} {pos:04x} 00 02")
|
||||
|
||||
def wait_for_gripper_init(self):
|
||||
res = self.read_address(self.id, "07 D0", 1)
|
||||
res_r = self.read_address(self.id, "07 D1", 1)
|
||||
while res[4] == 0x08 or res_r[4] == 0x08:
|
||||
res = self.read_address(self.id, "07 D0", 1)
|
||||
res_r = self.read_address(self.id, "07 D1", 1)
|
||||
time.sleep(0.1)
|
||||
|
||||
def wait_for_gripper(self):
|
||||
while self.gri_data[1] != 0:
|
||||
time.sleep(0.1)
|
||||
|
||||
def wait_for_rotate(self):
|
||||
while self.rot_data[1] != 0:
|
||||
time.sleep(0.1)
|
||||
|
||||
def data_reader(self):
|
||||
res_g = self.read_address(self.id, "07 D2", 2)
|
||||
res_r = self.read_address(self.id, "07 D4", 4)
|
||||
int32_value = (res_r[3] << 8) | res_r[4]
|
||||
if int32_value > 0x7FFF:
|
||||
int32_value -= 0x10000
|
||||
self.gri_data = (int(res_g[4]), int(res_g[3]), int(res_g[5]))
|
||||
self.rot_data = (int32_value, int(res_r[5]), int(res_r[6]))
|
||||
|
||||
def data_loop(self):
|
||||
while True:
|
||||
self.data_reader()
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
def node_gripper_move(self, cmd:str):
|
||||
self.success = False
|
||||
|
||||
try:
|
||||
cmd_list = [int(x) for x in cmd.replace(' ','').split(',')]
|
||||
self.gripper_move(*cmd_list)
|
||||
self.wait_for_gripper()
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
self.success = True
|
||||
|
||||
def node_rotate_move(self, cmd:str):
|
||||
self.success = False
|
||||
|
||||
try:
|
||||
cmd_list = [int(x) for x in cmd.replace(' ','').split(',')]
|
||||
self.rotate_move_abs(*cmd_list)
|
||||
self.wait_for_rotate()
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
self.success = True
|
||||
|
||||
def move_and_rotate(self, spin_pos, grasp_pos, spin_v, grasp_v, spin_F, grasp_F):
|
||||
self.gripper_move(grasp_pos, grasp_v, grasp_F)
|
||||
self.wait_for_gripper()
|
||||
self.rotate_move_abs(spin_pos, spin_v, spin_F)
|
||||
self.wait_for_rotate()
|
||||
|
||||
if __name__ == "__main__":
|
||||
gripper = EleGripper("COM12")
|
||||
gripper.init_gripper()
|
||||
gripper.wait_for_gripper_init()
|
||||
gripper.gripper_move(210,127,255)
|
||||
gripper.wait_for_gripper()
|
||||
gripper.rotate_move_abs(135,10,255)
|
||||
gripper.data_reader()
|
||||
print(gripper.rot_data)
|
||||
|
||||
87
unilabos/devices/motor/Utils.py
Normal file
87
unilabos/devices/motor/Utils.py
Normal file
@@ -0,0 +1,87 @@
|
||||
import struct
|
||||
import time
|
||||
from typing import Union
|
||||
|
||||
from .Consts import Config
|
||||
|
||||
def calculate_modbus_crc16(data: bytes) -> tuple[int, int]:
|
||||
"""
|
||||
计算 Modbus RTU 的 CRC16 校验码,返回 (low_byte, high_byte)。
|
||||
data 可以是 bytes 或者 bytearray
|
||||
"""
|
||||
crc = 0xFFFF
|
||||
for byte in data:
|
||||
crc ^= byte
|
||||
for _ in range(8):
|
||||
if (crc & 0x0001):
|
||||
crc = (crc >> 1) ^ 0xA001
|
||||
else:
|
||||
crc >>= 1
|
||||
|
||||
# 低字节在前、高字节在后
|
||||
low_byte = crc & 0xFF
|
||||
high_byte = (crc >> 8) & 0xFF
|
||||
return low_byte, high_byte
|
||||
|
||||
|
||||
def create_command(slave_id, func_code, address, data):
|
||||
"""
|
||||
生成完整的 Modbus 通信指令:
|
||||
- 第1字节: 从站地址
|
||||
- 第2字节: 功能码
|
||||
- 第3~4字节: 寄存器地址 (大端)
|
||||
- 第5~6字节: 数据(或读寄存器个数) (大端)
|
||||
- 第7~8字节: CRC校验, 先低后高
|
||||
"""
|
||||
# 按照大端格式打包:B(1字节), B(1字节), H(2字节), H(2字节)
|
||||
# 例如: 0x03, 0x03, 0x0191, 0x0001
|
||||
# 生成的命令将是: 03 03 01 91 00 01 (不含 CRC)
|
||||
command = struct.pack(">B B H H", slave_id, func_code, address, data)
|
||||
|
||||
# 计算CRC,并将低字节、后高字节拼到末尾
|
||||
low_byte, high_byte = calculate_modbus_crc16(command)
|
||||
return command + bytes([low_byte, high_byte])
|
||||
|
||||
|
||||
def send_command(ser, command) -> Union[bytes, str]:
|
||||
"""通过串口发送指令并打印响应"""
|
||||
# Modbus RTU 半双工,发送前拉高 RTS
|
||||
ser.setRTS(True)
|
||||
time.sleep(0.02)
|
||||
ser.write(command) # 发送指令
|
||||
if Config.OPEN_DEVICE:
|
||||
# 如果是实际串口,就打印16进制的发送内容
|
||||
print(f"发送的数据: ", end="")
|
||||
for ind, c in enumerate(command.hex().upper()):
|
||||
if ind % 2 == 0 and ind != 0:
|
||||
print(" ", end="")
|
||||
print(c, end="")
|
||||
|
||||
# 发送完成后,切换到接收模式
|
||||
ser.setRTS(False)
|
||||
|
||||
# 读取响应,具体长度要看从站返回,有时多字节
|
||||
response = ser.read(8) # 假设响应是8字节
|
||||
print(f"接收到的数据: ", end="")
|
||||
for ind, c in enumerate(response.hex().upper()):
|
||||
if ind % 2 == 0 and ind != 0:
|
||||
print(" ", end="")
|
||||
print(c, end="")
|
||||
print()
|
||||
return response
|
||||
|
||||
def get_result_byte_int(data: bytes, byte_start: int = 6, byte_end: int = 10) -> int:
|
||||
return int(data.hex()[byte_start:byte_end], 16)
|
||||
|
||||
def get_result_byte_str(data: bytes, byte_start: int = 6, byte_end: int = 10) -> str:
|
||||
return data.hex()[byte_start:byte_end]
|
||||
|
||||
def run_commands(ser, duration=0.1, *commands):
|
||||
for cmd in commands:
|
||||
if isinstance(cmd, list):
|
||||
for c in cmd:
|
||||
send_command(ser, c)
|
||||
time.sleep(duration)
|
||||
else:
|
||||
send_command(ser, cmd)
|
||||
time.sleep(duration)
|
||||
0
unilabos/devices/motor/__init__.py
Normal file
0
unilabos/devices/motor/__init__.py
Normal file
37
unilabos/devices/motor/base_types/PrPath.py
Normal file
37
unilabos/devices/motor/base_types/PrPath.py
Normal file
@@ -0,0 +1,37 @@
|
||||
from enum import Enum
|
||||
|
||||
class PR_PATH(Enum):
|
||||
# TYPE (Bit0-3)
|
||||
TYPE_NO_ACTION = 0x0000 # 无动作
|
||||
TYPE_POSITIONING = 0x0001 # 位置定位
|
||||
TYPE_VELOCITY = 0x0002 # 速度运行
|
||||
TYPE_HOME = 0x0003 # 回零
|
||||
|
||||
# INS (Bit4) - 默认都是插断模式
|
||||
INS_INTERRUPT = 0x0010 # 插断
|
||||
|
||||
# OVLP (Bit5) - 默认都是不重叠
|
||||
OVLP_NO_OVERLAP = 0x0000 # 不重叠
|
||||
|
||||
# POSITION MODE (Bit6)
|
||||
POS_ABSOLUTE = 0x0000 # 绝对位置
|
||||
POS_RELATIVE = 0x0040 # 相对位置
|
||||
|
||||
# MOTOR MODE (Bit7)
|
||||
MOTOR_ABSOLUTE = 0x0000 # 绝对电机
|
||||
MOTOR_RELATIVE = 0x0080 # 相对电机
|
||||
|
||||
# 常用组合(默认都是插断且不重叠)
|
||||
# 位置定位相关
|
||||
ABS_POS = TYPE_POSITIONING | INS_INTERRUPT | OVLP_NO_OVERLAP | POS_ABSOLUTE # 绝对定位
|
||||
REL_POS = TYPE_POSITIONING | INS_INTERRUPT | OVLP_NO_OVERLAP | POS_RELATIVE # 相对定位
|
||||
|
||||
# 速度运行相关
|
||||
VELOCITY = TYPE_VELOCITY | INS_INTERRUPT | OVLP_NO_OVERLAP # 速度模式
|
||||
|
||||
# 回零相关
|
||||
HOME = TYPE_HOME | INS_INTERRUPT | OVLP_NO_OVERLAP # 回零模式
|
||||
|
||||
# 电机模式组合
|
||||
ABS_POS_REL_MOTOR = ABS_POS | MOTOR_RELATIVE # 绝对定位+相对电机
|
||||
REL_POS_REL_MOTOR = REL_POS | MOTOR_RELATIVE # 相对定位+相对电机
|
||||
0
unilabos/devices/motor/base_types/__init__.py
Normal file
0
unilabos/devices/motor/base_types/__init__.py
Normal file
92
unilabos/devices/motor/commands/PositionControl.py
Normal file
92
unilabos/devices/motor/commands/PositionControl.py
Normal file
@@ -0,0 +1,92 @@
|
||||
|
||||
from ..base_types.PrPath import PR_PATH
|
||||
from ..Utils import create_command, get_result_byte_int, get_result_byte_str, send_command
|
||||
|
||||
|
||||
def create_position_commands(slave_id: int, which: int, how: PR_PATH,
|
||||
position: float, velocity: int = 300,
|
||||
acc_time: int = 50, dec_time: int = 50,
|
||||
special: int = 0) -> list[list[bytes]]:
|
||||
"""
|
||||
创建位置相关的Modbus命令列表
|
||||
|
||||
Args:
|
||||
slave_id: 从站地址
|
||||
which: PR路径号(0-7)
|
||||
how: PR_PATH枚举,定义运动方式
|
||||
position: 目标位置(Pulse)
|
||||
velocity: 运行速度(rpm)
|
||||
acc_time: 加速时间(ms/Krpm),默认50
|
||||
dec_time: 减速时间(ms/Krpm),默认50
|
||||
special: 特殊参数,默认0
|
||||
|
||||
Returns:
|
||||
包含所有设置命令的列表
|
||||
"""
|
||||
if not 0 <= which <= 7:
|
||||
raise ValueError("which必须在0到7之间")
|
||||
|
||||
base_addr = 0x6200 + which * 8
|
||||
|
||||
# 位置值保持原样(Pulse)
|
||||
position = int(position)
|
||||
print(f"路径方式 {' '.join(bin(how.value)[2:])} 位置 {position} 速度 {velocity}")
|
||||
position_high = (position >> 16) & 0xFFFF # 获取高16位
|
||||
position_low = position & 0xFFFF # 获取低16位
|
||||
|
||||
# 速度值(rpm)转换为0x0000格式
|
||||
velocity_value = 0x0000 + velocity
|
||||
|
||||
# 加减速时间(ms/Krpm)转换为0x0000格式
|
||||
acc_time_value = 0x0000 + int(acc_time)
|
||||
dec_time_value = 0x0000 + int(dec_time)
|
||||
|
||||
# 特殊参数转换为0x0000格式
|
||||
special_value = 0x0000 + special
|
||||
return [
|
||||
create_command(slave_id, 0x06, base_addr + 0, how.value), # 路径方式
|
||||
create_command(slave_id, 0x06, base_addr + 1, position_high), # 位置高16位
|
||||
create_command(slave_id, 0x06, base_addr + 2, position_low), # 位置低16位
|
||||
create_command(slave_id, 0x06, base_addr + 3, velocity_value), # 运行速度
|
||||
create_command(slave_id, 0x06, base_addr + 4, acc_time_value), # 加速时间
|
||||
create_command(slave_id, 0x06, base_addr + 5, dec_time_value), # 减速时间
|
||||
create_command(slave_id, 0x06, base_addr + 6, special_value), # 特殊参数
|
||||
]
|
||||
|
||||
def create_position_run_command(slave_id: int, which: int) -> list[list[bytes]]:
|
||||
print(f"运行路径 PR{which}")
|
||||
return [create_command(slave_id, 0x06, 0x6002, 0x0010 + which)]
|
||||
|
||||
def run_set_position_zero(ser, DEVICE_ADDRESS) -> list[list[bytes]]:
|
||||
print(f"手动回零")
|
||||
send_command(ser, create_command(DEVICE_ADDRESS, 0x06, 0x6002, 0x0021))
|
||||
|
||||
def run_stop(ser, DEVICE_ADDRESS) -> list[list[bytes]]:
|
||||
print(f"急停")
|
||||
send_command(ser, create_command(DEVICE_ADDRESS, 0x06, 0x6002, 0x0040))
|
||||
|
||||
def run_set_forward_run(ser, DEVICE_ADDRESS) -> list[list[bytes]]:
|
||||
print(f"设定正方向运动")
|
||||
send_command(ser, create_command(DEVICE_ADDRESS, 0x06, 0x0007, 0x0000))
|
||||
|
||||
def run_set_backward_run(ser, DEVICE_ADDRESS) -> list[list[bytes]]:
|
||||
print(f"设定反方向运动")
|
||||
send_command(ser, create_command(DEVICE_ADDRESS, 0x06, 0x0007, 0x0001))
|
||||
|
||||
def run_get_command_position(ser, DEVICE_ADDRESS, print_pos=True) -> int:
|
||||
retH = send_command(ser, create_command(DEVICE_ADDRESS, 0x03, 0x602A, 0x0001)) # 命令位置H
|
||||
retL = send_command(ser, create_command(DEVICE_ADDRESS, 0x03, 0x602B, 0x0001)) # 命令位置L
|
||||
value = get_result_byte_str(retH) + get_result_byte_str(retL)
|
||||
value = int(value, 16)
|
||||
if print_pos:
|
||||
print(f"命令位置: {value}")
|
||||
return value
|
||||
|
||||
def run_get_motor_position(ser, DEVICE_ADDRESS, print_pos=True) -> int:
|
||||
retH = send_command(ser, create_command(DEVICE_ADDRESS, 0x03, 0x602C, 0x0001)) # 电机位置H
|
||||
retL = send_command(ser, create_command(DEVICE_ADDRESS, 0x03, 0x602D, 0x0001)) # 电机位置L
|
||||
value = get_result_byte_str(retH) + get_result_byte_str(retL)
|
||||
value = int(value, 16)
|
||||
if print_pos:
|
||||
print(f"电机位置: {value}")
|
||||
return value
|
||||
44
unilabos/devices/motor/commands/Status.py
Normal file
44
unilabos/devices/motor/commands/Status.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import time
|
||||
from ..Utils import create_command, send_command
|
||||
from .PositionControl import run_get_motor_position
|
||||
|
||||
|
||||
def run_get_status(ser, DEVICE_ADDRESS, print_status=True) -> list[list[bytes]]:
|
||||
# Bit0 故障
|
||||
# Bit1 使能
|
||||
# Bit2 运行
|
||||
# Bit4 指令完成
|
||||
# Bit5 路径完成
|
||||
# Bit6 回零完成
|
||||
ret = send_command(ser, create_command(DEVICE_ADDRESS, 0x03, 0x1003, 0x0001))
|
||||
status = bin(int(ret.hex()[6:10], 16))[2:]
|
||||
# 用0左位补齐
|
||||
status = status.zfill(8)
|
||||
status_info_list = []
|
||||
if status[-1] == "1":
|
||||
status_info_list.append("故障")
|
||||
if status[-2] == "1":
|
||||
status_info_list.append("使能")
|
||||
if status[-3] == "1":
|
||||
status_info_list.append("运行")
|
||||
if status[-5] == "1":
|
||||
status_info_list.append("指令完成")
|
||||
if status[-6] == "1":
|
||||
status_info_list.append("路径完成")
|
||||
if status[-7] == "1":
|
||||
status_info_list.append("回零完成")
|
||||
if print_status:
|
||||
print(f"获取状态: {' '.join(status_info_list)}")
|
||||
return status_info_list
|
||||
|
||||
def run_until_status(ser, DEVICE_ADDRESS, status_info: str, max_time=10):
|
||||
start_time = time.time()
|
||||
while True:
|
||||
ret = run_get_status(ser, DEVICE_ADDRESS)
|
||||
if status_info in ret:
|
||||
break
|
||||
if time.time() - start_time > max_time:
|
||||
print(f"状态未达到 {status_info} 超时")
|
||||
return False
|
||||
time.sleep(0.05)
|
||||
return True
|
||||
12
unilabos/devices/motor/commands/Warning.py
Normal file
12
unilabos/devices/motor/commands/Warning.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from serial import Serial
|
||||
from ..Consts import Config
|
||||
from ..Utils import create_command, send_command
|
||||
|
||||
|
||||
def create_elimate_warning_command(DEVICE_ADDRESS):
|
||||
return create_command(DEVICE_ADDRESS, 0x06, 0x0145, 0x0087)
|
||||
|
||||
|
||||
def run_elimate_warning(ser: Serial, DEVICE_ADDRESS):
|
||||
send_command(ser, create_elimate_warning_command(DEVICE_ADDRESS))
|
||||
print("清除警报")
|
||||
0
unilabos/devices/motor/commands/__init__.py
Normal file
0
unilabos/devices/motor/commands/__init__.py
Normal file
126
unilabos/devices/motor/iCL42.py
Normal file
126
unilabos/devices/motor/iCL42.py
Normal file
@@ -0,0 +1,126 @@
|
||||
import os
|
||||
import sys
|
||||
from abc import abstractmethod
|
||||
from typing import Optional
|
||||
|
||||
import serial
|
||||
|
||||
from .Consts import Config
|
||||
from .FakeSerial import FakeSerial
|
||||
from .Utils import run_commands
|
||||
from .base_types.PrPath import PR_PATH
|
||||
from .commands.PositionControl import run_get_command_position, run_set_forward_run, create_position_commands, \
|
||||
create_position_run_command
|
||||
from .commands.Warning import run_elimate_warning
|
||||
|
||||
try:
|
||||
from unilabos.utils.pywinauto_util import connect_application, get_process_pid_by_name, get_ui_path_with_window_specification, print_wrapper_identifiers
|
||||
from unilabos.device_comms.universal_driver import UniversalDriver, SingleRunningExecutor
|
||||
from unilabos.device_comms import universal_driver as ud
|
||||
except Exception as e:
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..")))
|
||||
from unilabos.utils.pywinauto_util import connect_application, get_process_pid_by_name, get_ui_path_with_window_specification, print_wrapper_identifiers
|
||||
from unilabos.device_comms.universal_driver import UniversalDriver, SingleRunningExecutor
|
||||
from unilabos.devices.template_driver import universal_driver as ud
|
||||
print(f"使用文件DEBUG运行: {e}")
|
||||
|
||||
|
||||
class iCL42Driver(UniversalDriver):
|
||||
_ser: Optional[serial.Serial | FakeSerial] = None
|
||||
_motor_position: Optional[int] = None
|
||||
_DEVICE_COM: Optional[str] = None
|
||||
_DEVICE_ADDRESS: Optional[int] = None
|
||||
# 运行状态
|
||||
_is_executing_run: bool = False
|
||||
_success: bool = False
|
||||
|
||||
@property
|
||||
def motor_position(self) -> int:
|
||||
return self._motor_position
|
||||
|
||||
@property
|
||||
def is_executing_run(self) -> bool:
|
||||
return self._is_executing_run
|
||||
|
||||
@property
|
||||
def success(self) -> bool:
|
||||
return self._success
|
||||
|
||||
def init_device(self):
|
||||
# 配置串口参数
|
||||
if Config.OPEN_DEVICE:
|
||||
self._ser = serial.Serial(
|
||||
port=self._DEVICE_COM, # 串口号
|
||||
baudrate=38400, # 波特率
|
||||
bytesize=serial.EIGHTBITS, # 数据位
|
||||
parity=serial.PARITY_NONE, # 校验位 N-无校验
|
||||
stopbits=serial.STOPBITS_TWO, # 停止位
|
||||
timeout=1 # 超时时间
|
||||
)
|
||||
else:
|
||||
self._ser = FakeSerial()
|
||||
|
||||
def run_motor(self, mode: str, position: float, velocity: int):
|
||||
if self._ser is None:
|
||||
print("Device is not initialized")
|
||||
self._success = False
|
||||
return False
|
||||
def post_func(res, _):
|
||||
self._success = res
|
||||
if not res:
|
||||
self._is_executing_run = False
|
||||
ins: SingleRunningExecutor = SingleRunningExecutor.get_instance(
|
||||
self.execute_run_motor, post_func
|
||||
)
|
||||
# if not ins.is_ended and ins.is_started:
|
||||
# print("Function is running")
|
||||
# self._success = False
|
||||
# return False
|
||||
# elif not ins.is_started:
|
||||
# print("Function started")
|
||||
# ins.start() # 开始执行
|
||||
# else:
|
||||
# print("Function reset and started")
|
||||
ins.reset()
|
||||
ins.start(mode=mode, position=position, velocity=velocity)
|
||||
|
||||
def execute_run_motor(self, mode: str, position: float, velocity: int):
|
||||
position = int(position * 1000)
|
||||
PR = 0
|
||||
run_elimate_warning(self._ser, self._DEVICE_ADDRESS)
|
||||
run_set_forward_run(self._ser, self._DEVICE_ADDRESS)
|
||||
run_commands(
|
||||
self._ser, 0.1,
|
||||
create_position_commands(self._DEVICE_ADDRESS, PR, PR_PATH[mode], position, velocity), # 41.8cm 21.8cm
|
||||
create_position_run_command(self._DEVICE_ADDRESS, PR),
|
||||
)
|
||||
# if run_until_status(self._ser, self._DEVICE_ADDRESS, "路径完成"):
|
||||
# pass
|
||||
|
||||
|
||||
def __init__(self, device_com: str = "COM9", device_address: int = 0x01):
|
||||
self._DEVICE_COM = device_com
|
||||
self._DEVICE_ADDRESS = device_address
|
||||
self.init_device()
|
||||
# 启动所有监控器
|
||||
self.checkers = [
|
||||
# PositionChecker(self, 1),
|
||||
]
|
||||
for checker in self.checkers:
|
||||
checker.start_monitoring()
|
||||
|
||||
@abstractmethod
|
||||
class DriverChecker(ud.DriverChecker):
|
||||
driver: iCL42Driver
|
||||
|
||||
class PositionChecker(DriverChecker):
|
||||
def check(self):
|
||||
# noinspection PyProtectedMember
|
||||
if self.driver._ser is not None:
|
||||
# noinspection PyProtectedMember
|
||||
self.driver._motor_position = run_get_command_position(self.driver._ser, self.driver._DEVICE_ADDRESS)
|
||||
|
||||
# 示例用法
|
||||
if __name__ == "__main__":
|
||||
driver = iCL42Driver("COM3")
|
||||
driver._is_executing_run = True
|
||||
79
unilabos/devices/motor/test.py
Normal file
79
unilabos/devices/motor/test.py
Normal file
@@ -0,0 +1,79 @@
|
||||
# 使用pyserial进行485的协议通信,端口指定为COM4
|
||||
import serial
|
||||
from serial.rs485 import RS485Settings
|
||||
import traceback
|
||||
|
||||
|
||||
from Consts import Config
|
||||
from FakeSerial import FakeSerial
|
||||
from base_types.PrPath import PR_PATH
|
||||
from Utils import run_commands
|
||||
from commands.PositionControl import create_position_commands, create_position_run_command, run_get_command_position, run_get_motor_position, run_set_forward_run
|
||||
from commands.Status import run_get_status, run_until_status
|
||||
from commands.Warning import run_elimate_warning
|
||||
from Grasp import EleGripper
|
||||
|
||||
|
||||
DEVICE_ADDRESS = Config.DEVICE_ADDRESS
|
||||
|
||||
# 配置串口参数
|
||||
if Config.OPEN_DEVICE:
|
||||
ser = serial.Serial(
|
||||
port='COM11', # 串口号
|
||||
baudrate=38400, # 波特率
|
||||
bytesize=serial.EIGHTBITS, # 数据位
|
||||
parity=serial.PARITY_NONE, # 校验位 N-无校验
|
||||
stopbits=serial.STOPBITS_TWO, # 停止位
|
||||
timeout=1 # 超时时间
|
||||
)
|
||||
else:
|
||||
ser = FakeSerial()
|
||||
|
||||
# RS485模式支持(如果需要)
|
||||
try:
|
||||
ser.rs485_mode = RS485Settings(
|
||||
rts_level_for_tx=True,
|
||||
rts_level_for_rx=False,
|
||||
# delay_before_tx=0.01,
|
||||
# delay_before_rx=0.01
|
||||
)
|
||||
except AttributeError:
|
||||
traceback.print_exc()
|
||||
print("RS485模式需要支持的硬件和pyserial版本")
|
||||
|
||||
# run_set_position_zero(ser, DEVICE_ADDRESS)
|
||||
|
||||
# api.get_running_state(ser, DEVICE_ADDRESS)
|
||||
gripper = EleGripper("COM12")
|
||||
gripper.init_gripper()
|
||||
gripper.wait_for_gripper_init()
|
||||
PR = 0
|
||||
run_get_status(ser, DEVICE_ADDRESS)
|
||||
run_elimate_warning(ser, DEVICE_ADDRESS)
|
||||
run_set_forward_run(ser, DEVICE_ADDRESS)
|
||||
run_commands(
|
||||
ser, 0.1,
|
||||
create_position_commands(DEVICE_ADDRESS, PR, PR_PATH.ABS_POS, 20 * 1000, 300), # 41.8cm 21.8cm
|
||||
create_position_run_command(DEVICE_ADDRESS, PR),
|
||||
)
|
||||
if run_until_status(ser, DEVICE_ADDRESS, "路径完成"):
|
||||
pass
|
||||
gripper.gripper_move(210,127,255)
|
||||
gripper.wait_for_gripper()
|
||||
gripper.rotate_move_abs(135,10,255)
|
||||
gripper.data_reader()
|
||||
print(gripper.rot_data)
|
||||
run_commands(
|
||||
ser, 0.1,
|
||||
create_position_commands(DEVICE_ADDRESS, PR, PR_PATH.ABS_POS, 30 * 1000, 300), # 41.8cm 21.8cm
|
||||
create_position_run_command(DEVICE_ADDRESS, PR),
|
||||
)
|
||||
gripper.gripper_move(210,127,255)
|
||||
gripper.wait_for_gripper()
|
||||
gripper.rotate_move_abs(135,10,255)
|
||||
gripper.data_reader()
|
||||
|
||||
# run_get_command_position(ser, DEVICE_ADDRESS)
|
||||
# run_get_motor_position(ser, DEVICE_ADDRESS)
|
||||
|
||||
# ser.close()
|
||||
Reference in New Issue
Block a user