Commit 5e75d82c authored by 张 煜's avatar 张 煜
Browse files

定时检测程序写完,进行测试

parent c4b3a7f9
No related merge requests found
Showing with 89 additions and 7 deletions
+89 -7
......@@ -109,8 +109,8 @@ generate_messages(
## CATKIN_DEPENDS: catkin_packages dependent projects also need
## DEPENDS: system dependencies of this project that dependent projects also need
catkin_package(
INCLUDE_DIRS include
LIBRARIES sage_magnet_opt
#INCLUDE_DIRS include
#LIBRARIES sage_magnet_opt
CATKIN_DEPENDS actionlib actionlib_msgs roscpp rospy smach std_msgs message_runtime
# DEPENDS system_lib
)
......@@ -122,7 +122,7 @@ catkin_package(
## Specify additional locations of header files
## Your package locations should be listed before other locations
include_directories(
include
#include
${catkin_INCLUDE_DIRS}
)
......
......@@ -8,6 +8,67 @@ import sage_arm_msgs.srv
from sage_magnet_opt.srv import ClawMagnetControl
from sage_magnet_opt.srv import ClawMagnetControlResponse
#全局变量
##task函数
#task状态更新
def task_state_update():
global task_state_last
current_state = rospy.get_param("task_is_running")
if current_state == task_state_last:
task_state_last = current_state
return True # 状态变化,返回 False
else:
task_state_last = current_state
return False # 状态稳定,返回 True
#task状态检测#task_numb 是检测多少次认定可以去进行一次磁性检测
def task_state_check(task_numb):
global task_count
if task_state_update():
task_count += 1
if task_count>task_numb:
task_count = 0
#检测到状态长时间未改变,检测磁铁状态
if magnet_state_check():
#磁铁与状态检测正式开始
if magnet_error_check(10,2):#默认10,60->总共10min
rospy.logerr("检测到磁铁异常上电时间过长,进入磁铁保护")
set_magnet(False)
else:
return
else:
return
else:
task_count = 0
return
#磁铁错误检查开始,1min检测一次检测几分钟,并添加最大检测时间为20min
def magnet_error_check(error_num,check_time_sec,timeout_min=20):
check_HZ = 1/check_time_sec
err_count = 0
start_time = rospy.get_time()
rate = rospy.Rate(check_HZ) # 1分钟检测一次
# 将 timeout_min 转换为秒
timeout_sec = timeout_min * 60
while err_count < error_num and (rospy.get_time() - start_time) < timeout_sec:
#如果状态发生改变就退出
if task_state_update() and magnet_state_check():
err_count += 1
#当状态改变,或者磁性不是true了立马弹出重新计数
else:
err_count = 0
return False
rate.sleep() # 控制循环频率
#当超时依然状态不正常时弹出
return True
# 磁铁控制函数
def set_magnet(state):
......@@ -32,8 +93,12 @@ def magnet_set_callback(request):
def magnet_state_check():
res = get_io_state()
if (res.D0[0]) or (res.D0[1] or (res.D0[2])):
rospy.logerr("磁铁异常,正在关闭磁铁")
set_magnet(False)
#rospy.logwarn("磁铁异常,正在关闭磁铁")
return True
else:
return False
......@@ -46,5 +111,22 @@ if __name__ == '__main__':
# 获取夹爪io状态
get_io_state = rospy.ServiceProxy('get_io_states', sage_arm_msgs.srv.GetIoStates)
#main
rospy.spin()
\ No newline at end of file
rospy.set_param("run_task_state_check", True) # 默认为 True,表示运行状态检测
#main
# 设置循环的频率,例如每分钟10次
#hz与时间的转换
loop_time_sec = 1 #默认30
loop_HZ = 1/ loop_time_sec
loop_rate = rospy.Rate(loop_HZ) # 10Hz
#上一次状态保存
task_state_last = False
task_count = 0
while not rospy.is_shutdown():
if rospy.get_param("run_task_state_check", True):
#由状态检测发起,30s一次,检测20次10分钟,即10分钟没有变化,认为有磁铁长时间上磁风险
task_state_check(20)
loop_rate.sleep()
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment