0편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][0] – ros2, gazebo 설치 및 예제 실행
1편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][1] – ros2 publisher subscriber c++, python 구현 및 실행
2편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][2] – ros2 와 아두이노 통신 (micro ros arduino), OpenCR, robotis, dynamixel 모터 개발 환경 구성 및 예제
3편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][3] – ros2 와 아두이노(OpenCR)로 여러개의 dynamixel 모터 제어 및 엔코더 값 받기, robotis
4편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ROS2, gazebo][4] – ROS2 u2d2 를 활용한 모터 제어, dynamixel SDK tutorial
2편, 3편 생략 가능
1. 모터 2개 동시에 움직이기
3개의 코드가 존재한다.
- 각각의 모터를 어떤 값으로 움직일지 desired 각도를 pub 하는 코드
- pub 된 모터 각도를 sub 하여 식제 모터를 움직이는 코드
- 모터 엔코더로부터 모터의 현재 각도를 입력받아 pub 하는 코드
- 모터의 현재 각도를 sub 하여 실시간으로 plot 하는 코드
결국 전체적인 흐름도는 위와 같다.
ROS2 를 활용하여 manipulator 각 모터의 각도 입력 – U2D2 를 거쳐서 실제 dynamixel 모터에 각도 입력 – 실제 dynamixel 모터로부터 엔코더 각도 값을 피드백 – 각 모터의 각도값을 plot
1번 코드
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import Int32MultiArray
import math
class Pub(Node):
def __init__(self):
super().__init__('pub')
qos_profile = QoSProfile(depth=10)
self.publisher_ = self.create_publisher(Int32MultiArray, '/cmd_positions', qos_profile)
timer_period = 0.1 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.angle1 = 0 # Initial angle for motor 1 in degrees
self.angle2 = 90 # Initial angle for motor 2 in degrees (90 degrees phase offset)
def timer_callback(self):
msg = Int32MultiArray()
msg.data = [] # Initialize the data array
# Calculate sine values for two angles and scale to the range [1028, 3028]
motor1_input = int(1000 * math.sin(math.radians(self.angle1)) + 2028)
motor2_input = int(100 * math.sin(math.radians(self.angle2)) + 2028)
# Append motor values to the message
msg.data.append(motor1_input)
msg.data.append(motor2_input)
# Publish the message
print(f"Publishing values: Motor1: {motor1_input}, Motor2: {motor2_input}")
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing: Motor1: {motor1_input}, Motor2: {motor2_input}')
# Increment angles and wrap around after 360 degrees
self.angle1 = (self.angle1 + 5) % 360
self.angle2 = (self.angle2 + 5) % 360
def main(args=None):
rclpy.init(args=args)
node = Pub()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
\+ setup.py 에서 entry_points 추가하는 것 까먹지 말기
cd ~/ros2_ws/
colcon build
source /opt/ros/foxy/setup.bash
source ~/ros2_ws/install/local_setup.bash
ros2 run py_pubsub motor22
2. 번 3. 번 코드는 같이
#include <rclcpp/rclcpp.hpp>
#include "std_msgs/msg/int32_multi_array.hpp"
#include "dynamixel_sdk/dynamixel_sdk.h"
rclcpp::Node::SharedPtr node = nullptr;
using namespace dynamixel;
// Control table address
#define ADDR_OPERATING_MODE 11 // 제어모드
#define ADDR_TORQUE_ENABLE 64 // Torque on/off 제어
#define ADDR_GOAL_POSITION 116 // 위치 제어
#define ADDR_PRESENT_POSITION 132 // 현재 위치
// Protocol version
#define PROTOCOL_VERSION 2.0 // Default Protocol version of DYNAMIXEL X series.
// Default setting
#define BAUDRATE 57600 // Default Baudrate of DYNAMIXEL X series
#define DEVICE_NAME "/dev/ttyUSB0" // Port name
#define VALUE 2048 // 초기 목표 위치 (중간값)
#define OPERATING_VALUE 3 // 제어모드, 위치제어: 3
PortHandler *portHandler;
PacketHandler *packetHandler;
GroupSyncWrite *groupSyncWrite;
GroupSyncRead *groupSyncRead;
std::vector<uint8_t> motor_ids = {13, 14}; // 모터 ID 목록
rclcpp::Publisher<std_msgs::msg::Int32MultiArray>::SharedPtr position_publisher;
void publishCurrentPositions() {
std_msgs::msg::Int32MultiArray msg;
msg.data.clear();
// Read positions for all motors
if (groupSyncRead->txRxPacket() == COMM_SUCCESS) {
for (uint8_t id : motor_ids) {
if (groupSyncRead->isAvailable(id, ADDR_PRESENT_POSITION, 4)) {
int32_t position = groupSyncRead->getData(id, ADDR_PRESENT_POSITION, 4);
msg.data.push_back(position);
} else {
RCLCPP_ERROR(node->get_logger(), "Failed to get position data for ID %d", id);
return;
}
}
position_publisher->publish(msg);
RCLCPP_INFO(node->get_logger(), "Published current positions");
} else {
RCLCPP_ERROR(node->get_logger(), "Failed to read positions");
}
}
void topic_callback(const std_msgs::msg::Int32MultiArray::SharedPtr msg)
{
if (msg->data.size() != motor_ids.size()) {
RCLCPP_ERROR(node->get_logger(), "Received array does not match the number of motors.");
return;
}
groupSyncWrite->clearParam();
for (size_t i = 0; i < motor_ids.size(); i++) {
int position = msg->data[i];
// 위치 값을 클램핑
if (position < 0) position = 0;
if (position > 4095) position = 4095;
// 4바이트 데이터 준비
uint8_t param_goal_position[4];
param_goal_position[0] = DXL_LOBYTE(DXL_LOWORD(position));
param_goal_position[1] = DXL_HIBYTE(DXL_LOWORD(position));
param_goal_position[2] = DXL_LOBYTE(DXL_HIWORD(position));
param_goal_position[3] = DXL_HIBYTE(DXL_HIWORD(position));
// GroupSyncWrite에 데이터 추가
if (!groupSyncWrite->addParam(motor_ids[i], param_goal_position)) {
RCLCPP_ERROR(node->get_logger(), "Failed to add parameter for motor ID %d", motor_ids[i]);
return;
}
}
// GroupSyncWrite 패킷 전송
int dxl_comm_result = groupSyncWrite->txPacket();
if (dxl_comm_result != COMM_SUCCESS) {
RCLCPP_ERROR(node->get_logger(), "GroupSyncWrite failed: %s", packetHandler->getTxRxResult(dxl_comm_result));
} else {
RCLCPP_INFO(node->get_logger(), "Updated positions for all motors.");
publishCurrentPositions(); // Publish current positions after movement
}
}
int main(int argc, char **argv)
{
uint8_t dxl_error = 0;
rclcpp::init(argc, argv);
node = rclcpp::Node::make_shared("Dynamixel");
// 포트 핸들러 초기화
portHandler = dynamixel::PortHandler::getPortHandler(DEVICE_NAME);
packetHandler = dynamixel::PacketHandler::getPacketHandler(PROTOCOL_VERSION);
groupSyncWrite = new GroupSyncWrite(portHandler, packetHandler, ADDR_GOAL_POSITION, 4);
groupSyncRead = new GroupSyncRead(portHandler, packetHandler, ADDR_PRESENT_POSITION, 4);
// Add parameters for sync read
for (uint8_t id : motor_ids) {
if (!groupSyncRead->addParam(id)) {
RCLCPP_ERROR(node->get_logger(), "Failed to add parameter for GroupSyncRead for ID %d", id);
return -1;
}
}
// 포트 열기
if (!portHandler->openPort()) {
RCLCPP_ERROR(node->get_logger(), "Failed to open the port!");
return -1;
}
// 통신 속도 설정
if (!portHandler->setBaudRate(BAUDRATE)) {
RCLCPP_ERROR(node->get_logger(), "Failed to set the baudrate!");
return -1;
}
// 모터 초기화
for (uint8_t id : motor_ids) {
// 제어 모드 설정
packetHandler->write1ByteTxRx(portHandler, id, ADDR_OPERATING_MODE, OPERATING_VALUE, &dxl_error);
if (dxl_error != 0) {
RCLCPP_ERROR(node->get_logger(), "Failed to set operating mode for ID %d!", id);
return -1;
}
// 토크 활성화
packetHandler->write1ByteTxRx(portHandler, id, ADDR_TORQUE_ENABLE, 1, &dxl_error);
if (dxl_error != 0) {
RCLCPP_ERROR(node->get_logger(), "Failed to enable torque for ID %d!", id);
return -1;
}
// 초기 위치 설정
packetHandler->write4ByteTxRx(portHandler, id, ADDR_GOAL_POSITION, VALUE, &dxl_error);
}
RCLCPP_INFO(node->get_logger(), "All Dynamixels initialized in position control mode.");
// Create publisher for current positions
position_publisher = node->create_publisher<std_msgs::msg::Int32MultiArray>("/current_positions", 10);
// Create timer for periodic position publishing
auto timer = node->create_wall_timer(
std::chrono::milliseconds(100),
publishCurrentPositions
);
// Subscriber 생성
auto subscription = node->create_subscription<std_msgs::msg::Int32MultiArray>(
"/cmd_positions", 10, topic_callback);
// ROS 2 실행
rclcpp::spin(node);
// 정리 및 토크 비활성화
for (uint8_t id : motor_ids) {
packetHandler->write1ByteTxRx(portHandler, id, ADDR_TORQUE_ENABLE, 0, &dxl_error);
}
rclcpp::shutdown();
// 포트 닫기
portHandler->closePort();
delete groupSyncWrite;
delete groupSyncRead;
return 0;
}
cmakelists.txt 수정해주는 것 까먹지 말기
cd ~/ros2_ws/
colcon build
source /opt/ros/foxy/setup.bash
source ~/ros2_ws/install/local_setup.bash
ros2 run test_dynamixel test_move_pos2 /dev/ttyUSB0
4번 코드
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import Int32MultiArray
import matplotlib.pyplot as plt
from collections import deque
import numpy as np
class PlotCurrentPositions(Node):
def __init__(self):
super().__init__('plot_current_positions')
# Create subscriber with QoS profile
qos_profile = QoSProfile(depth=10)
self.subscription = self.create_subscription(
Int32MultiArray,
'/current_positions',
self.listener_callback,
qos_profile)
# Initialize plot
plt.ion() # Enable interactive mode
self.fig, self.ax = plt.subplots()
self.lines = []
colors = ['b', 'r'] # Colors for each motor
# Create deques with max length 1000 for each motor
self.positions = [deque([0]*1000, maxlen=1000), deque([0]*1000, maxlen=1000)]
self.time_steps = deque(range(1000), maxlen=1000)
# Initialize lines for each motor
for i in range(2):
line, = self.ax.plot(list(self.time_steps), list(self.positions[i]),
colors[i], label=f'Motor {i+1}')
self.lines.append(line)
# Set plot properties
self.ax.set_xlabel('Time Steps')
self.ax.set_ylabel('Position')
self.ax.set_title('Motor Positions Over Time')
self.ax.legend()
self.ax.grid(True)
# Set initial axis limits
self.ax.set_xlim(0, 1000)
self.ax.set_ylim(0, 4096) # Full range of Dynamixel positions
def listener_callback(self, msg):
# Update position data
for i, pos in enumerate(msg.data):
self.positions[i].append(pos)
# Update time steps
self.time_steps.append(self.time_steps[-1] + 1)
# Update plot data
for i, line in enumerate(self.lines):
x_data = list(self.time_steps)
y_data = list(self.positions[i])
if len(x_data) == len(y_data): # Ensure data lengths match
line.set_data(x_data, y_data)
# Adjust x-axis limits to show latest data
self.ax.set_xlim(self.time_steps[0], self.time_steps[-1])
# Draw plot
self.fig.canvas.draw()
self.fig.canvas.flush_events()
def main(args=None):
rclpy.init(args=args)
plot_node = PlotCurrentPositions()
try:
rclpy.spin(plot_node)
except KeyboardInterrupt:
plot_node.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
plot_node.destroy_node()
rclpy.shutdown()
plt.ioff()
plt.close()
if __name__ == '__main__':
main()
cd ~/ros2_ws/
colcon build
source /opt/ros/foxy/setup.bash
source ~/ros2_ws/install/local_setup.bash
ros2 run py_pubsub plot_angle
결과 영상 (3dof manipulator 에 적용)
motor 입력 생성 코드 (위 영상과 다른 코드)
– 제어주기를 4ms 로 하여 위 영상보다 부드럽게 움직임
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import Int32MultiArray
import math
class Pub(Node):
def __init__(self):
super().__init__('pub')
qos_profile = QoSProfile(depth=10)
self.publisher_ = self.create_publisher(
Int32MultiArray, '/cmd_positions', qos_profile)
self.time_period = 0.004 # 4ms control period
self.timer = self.create_timer(self.time_period, self.timer_callback)
self.operation_t = 0.0 # Operation time in seconds
self.angle1 = 0 # Initial angle for motor 1 in degrees
self.angle2 = 0 # Initial angle for motor 2 in degrees
self.angle3 = 0
def timer_callback(self):
msg = Int32MultiArray()
msg.data = [] # Initialize the data array
# Calculate sine values using operation time
# Scale to motor resolution (4096) and add offset (2048)
freq = 0.5 # 0.5 Hz = 2 second period
motor1_input = int(400 * math.sin(2*math.pi*freq*self.operation_t) + 2048)
motor2_input = int(200 * math.sin(2*math.pi*freq*self.operation_t) + 2048)
motor3_input = int(300 * math.sin(2*math.pi*freq*self.operation_t) + 2048)
# Clamp values between 0 and 4095
motor1_input = max(0, min(4095, motor1_input))
motor2_input = max(0, min(4095, motor2_input))
motor3_input = max(0, min(4095, motor3_input))
# Append motor values to the message
msg.data.append(motor1_input)
msg.data.append(motor2_input)
msg.data.append(motor3_input)
# Publish the message
self.publisher_.publish(msg)
self.get_logger().info(
f'Time: {self.operation_t:.3f}s, '
f'Motors: {motor1_input}, {motor2_input}, {motor3_input}'
)
# Increment operation time
self.operation_t += self.time_period
def main(args=None):
rclpy.init(args=args)
node = Pub()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
환경 구성 완료 후 실행해야할 코드들 정리 (터미널 3개)
- source /opt/ros/foxy/setup.bash
source ~/ros2_ws/install/local_setup.bash
ros2 run test_dynamixel test_move_pos3 /dev/ttyUSB0 - source /opt/ros/foxy/setup.bash
source ~/ros2_ws/install/local_setup.bash
ros2 run py_pubsub manipulator_test - source /opt/ros/foxy/setup.bash
source ~/ros2_ws/install/local_setup.bash
ros2 run py_pubsub plot_angle
0 Comments