0편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][0] – ros2, gazebo 설치 및 예제 실행
1편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][1] – ros2 publisher subscriber c++, python 구현 및 실행
2편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][2] – ros2 와 아두이노 통신 (micro ros arduino), OpenCR, robotis, dynamixel 모터 개발 환경 구성 및 예제
1. 원하는 모터 값을 pub 하는 ros2 노드 구현
아두이노 코드는 아래와 같다
#include <micro_ros_arduino.h>
#include <DynamixelSDK.h>
#include <stdio.h>
#include <rcl/rcl.h>
#include <rcl/error_handling.h>
#include <rclc/rclc.h>
#include <rclc/executor.h>
// Include the required message types
#include <std_msgs/msg/int32.h>
#include <std_msgs/msg/int32_multi_array.h>
rcl_subscription_t subscriber;
rcl_publisher_t publisher;
std_msgs__msg__Int32 sub_msg;
std_msgs__msg__Int32MultiArray pub_msg;
rclc_executor_t executor;
rclc_support_t support;
rcl_allocator_t allocator;
rcl_node_t node;
#define LED_PIN 13
// Control Table Address (XL430)
#define ADDR_XL_TORQUE_ENABLE 64 // Torque Enable Address
#define ADDR_XL_GOAL_POSITION 116 // Goal Position Address
#define ADDR_XL_PRESENT_POSITION 132 // Present Position Address
// Data Byte Length
#define LEN_XL_GOAL_POSITION 4
#define LEN_XL_PRESENT_POSITION 4
// Protocol version
#define PROTOCOL_VERSION 2.0 // XL430 uses Protocol 2.0
// Default setting
#define DXL1_ID 13 // Dynamixel#1 ID
#define DXL2_ID 14 // Dynamixel#2 ID
#define BAUDRATE 57600 // Baudrate
#define DEVICENAME "OpenCR_DXL_Port" // Serial Port
#define TORQUE_ENABLE 1 // Value for enabling the torque
#define TORQUE_DISABLE 0 // Value for disabling the torque
#define DXL_MINIMUM_POSITION_VALUE 0 // Minimum position value (in pulse)
#define DXL_MAXIMUM_POSITION_VALUE 4095 // Maximum position value (in pulse)
#define DXL_MOVING_STATUS_THRESHOLD 10 // Moving status threshold
#define RCCHECK(fn) { rcl_ret_t temp_rc = fn; if((temp_rc != RCL_RET_OK)){error_loop();}}
#define RCSOFTCHECK(fn) { rcl_ret_t temp_rc = fn; if((temp_rc != RCL_RET_OK)){}}
dynamixel::PortHandler *portHandler;
dynamixel::PacketHandler *packetHandler;
dynamixel::GroupSyncWrite *groupSyncWrite;
dynamixel::GroupSyncRead *groupSyncRead;
int32_t dxl_present_positions[2];
void error_loop() {
while (1) {
digitalWrite(LED_PIN, !digitalRead(LED_PIN));
delay(100);
}
}
// Helper to clamp values
int clamp(int value, int min_value, int max_value) {
if (value < min_value) return min_value;
if (value > max_value) return max_value;
return value;
}
// Subscription Callback
void subscription_callback(const void *msgin) {
const std_msgs__msg__Int32 *msg = (const std_msgs__msg__Int32 *)msgin;
// Clamp the received value within the allowed range
int target_position = clamp(msg->data, DXL_MINIMUM_POSITION_VALUE, DXL_MAXIMUM_POSITION_VALUE);
// Prepare goal position parameter
uint8_t param_goal_position[4];
param_goal_position[0] = DXL_LOBYTE(DXL_LOWORD(target_position));
param_goal_position[1] = DXL_HIBYTE(DXL_LOWORD(target_position));
param_goal_position[2] = DXL_LOBYTE(DXL_HIWORD(target_position));
param_goal_position[3] = DXL_HIBYTE(DXL_HIWORD(target_position));
// Add goal position for DXL1 and DXL2
groupSyncWrite->addParam(DXL1_ID, param_goal_position);
groupSyncWrite->addParam(DXL2_ID, param_goal_position);
// Write goal positions
groupSyncWrite->txPacket();
groupSyncWrite->clearParam();
// Read current positions
groupSyncRead->txRxPacket();
dxl_present_positions[0] = groupSyncRead->getData(DXL1_ID, ADDR_XL_PRESENT_POSITION, LEN_XL_PRESENT_POSITION);
dxl_present_positions[1] = groupSyncRead->getData(DXL2_ID, ADDR_XL_PRESENT_POSITION, LEN_XL_PRESENT_POSITION);
// Publish the current positions as an array
pub_msg.data.data[0] = dxl_present_positions[0];
pub_msg.data.data[1] = dxl_present_positions[1];
RCSOFTCHECK(rcl_publish(&publisher, &pub_msg, NULL));
}
void setup() {
// Setup ROS transport
set_microros_transports();
pinMode(LED_PIN, OUTPUT);
digitalWrite(LED_PIN, HIGH);
delay(2000);
allocator = rcl_get_default_allocator();
// Initialize support
RCCHECK(rclc_support_init(&support, 0, NULL, &allocator));
// Create node
RCCHECK(rclc_node_init_default(&node, "micro_ros_arduino_node", "", &support));
// Create subscriber
RCCHECK(rclc_subscription_init_default(
&subscriber,
&node,
ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Int32),
"micro_ros_arduino_subscriber"));
// Create publisher
RCCHECK(rclc_publisher_init_default(
&publisher,
&node,
ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Int32MultiArray),
"micro_ros_arduino_publisher"));
// Create executor
RCCHECK(rclc_executor_init(&executor, &support.context, 2, &allocator));
RCCHECK(rclc_executor_add_subscription(&executor, &subscriber, &sub_msg, &subscription_callback, ON_NEW_DATA));
// Initialize Dynamixel Communication
portHandler = dynamixel::PortHandler::getPortHandler(DEVICENAME);
packetHandler = dynamixel::PacketHandler::getPacketHandler(PROTOCOL_VERSION);
groupSyncWrite = new dynamixel::GroupSyncWrite(portHandler, packetHandler, ADDR_XL_GOAL_POSITION, LEN_XL_GOAL_POSITION);
groupSyncRead = new dynamixel::GroupSyncRead(portHandler, packetHandler, ADDR_XL_PRESENT_POSITION, LEN_XL_PRESENT_POSITION);
if (!portHandler->openPort()) error_loop();
if (!portHandler->setBaudRate(BAUDRATE)) error_loop();
uint8_t dxl_error;
int dxl_comm_result;
// Enable motor torque
for (int id : {DXL1_ID, DXL2_ID}) {
dxl_comm_result = packetHandler->write1ByteTxRx(portHandler, id, ADDR_XL_TORQUE_ENABLE, TORQUE_ENABLE, &dxl_error);
if (dxl_comm_result != COMM_SUCCESS || dxl_error != 0) error_loop();
groupSyncRead->addParam(id);
}
// Initialize publisher message
pub_msg.data.data = (int32_t *)malloc(2 * sizeof(int32_t)); // Allocate memory for two integers
pub_msg.data.size = 2; // Array size
pub_msg.data.capacity = 2; // Array capacity
}
void loop() {
RCCHECK(rclc_executor_spin_some(&executor, RCL_MS_TO_NS(100)));
delay(100); // Small delay to avoid busy loop
}
이제 motor angle 을 publish 하는 코드를 작성하자
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import Int32
import math
class Pub(Node):
def __init__(self):
super().__init__('pub')
qos_profile = QoSProfile(depth=10)
self.publisher_ = self.create_publisher(Int32, '/micro_ros_arduino_subscriber', qos_profile)
timer_period = 0.1 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.angle = 0 # Initial angle in degrees
def timer_callback(self):
msg = Int32()
# Calculate sin(angle) and convert it to integer
msg.data = int(1000 * math.sin(math.radians(self.angle))+ 2028) # Scale to range [-1000, 1000]
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%d"' % msg.data)
# Increment angle and wrap around after 360 degrees
self.angle = (self.angle + 5) % 360
def main(args=None):
rclpy.init(args=args)
node = Pub()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
sin 각도를 pub 하는 코드이다. 코드를 작성한 후
setup.py 코드를 위와 같이 수정해준다. (entry_points 부분이 수정되었다.)
터미널을 켜서 아래 코드를 수행한다.
cd ~/ros2_ws/
colcon build
source /opt/ros/foxy/setup.bash
source ~/ros2_ws/install/local_setup.bash
ros2 run py_pubsub motor1
아래 영상 처럼 모터가 ros2 에서 pub 하는 값에 따라 잘 움직이는 것을 볼 수 있다.
2.
#include <micro_ros_arduino.h>
#include <DynamixelSDK.h>
#include <stdio.h>
#include <rcl/rcl.h>
#include <rcl/error_handling.h>
#include <rclc/rclc.h>
#include <rclc/executor.h>
// Include the required message types
#include <std_msgs/msg/int32.h>
#include <std_msgs/msg/int32_multi_array.h>
rcl_subscription_t subscriber;
rcl_publisher_t publisher;
std_msgs__msg__Int32 sub_msg;
std_msgs__msg__Int32MultiArray pub_msg;
rclc_executor_t executor;
rclc_support_t support;
rcl_allocator_t allocator;
rcl_node_t node;
#define LED_PIN 13
// Control Table Address (XL430)
#define ADDR_XL_TORQUE_ENABLE 64
#define ADDR_XL_GOAL_POSITION 116
#define ADDR_XL_PRESENT_POSITION 132
// Data Byte Length
#define LEN_XL_GOAL_POSITION 4
#define LEN_XL_PRESENT_POSITION 4
// Protocol version
#define PROTOCOL_VERSION 2.0
// Default setting
#define DXL1_ID 13
#define DXL2_ID 14
#define BAUDRATE 57600
#define DEVICENAME "OpenCR_DXL_Port"
#define TORQUE_ENABLE 1
#define TORQUE_DISABLE 0
#define DXL_MINIMUM_POSITION_VALUE 0
#define DXL_MAXIMUM_POSITION_VALUE 4095
#define DXL_MOVING_STATUS_THRESHOLD 10
#define RCCHECK(fn) { rcl_ret_t temp_rc = fn; if((temp_rc != RCL_RET_OK)){error_loop();}}
#define RCSOFTCHECK(fn) { rcl_ret_t temp_rc = fn; if((temp_rc != RCL_RET_OK)){}}
dynamixel::PortHandler *portHandler;
dynamixel::PacketHandler *packetHandler;
dynamixel::GroupSyncWrite *groupSyncWrite;
dynamixel::GroupSyncRead *groupSyncRead;
int32_t dxl_present_positions[2];
void error_loop() {
while (1) {
digitalWrite(LED_PIN, !digitalRead(LED_PIN));
delay(100);
}
}
int clamp(int value, int min_value, int max_value) {
if (value < min_value) return min_value;
if (value > max_value) return max_value;
return value;
}
bool led_state = false;
void subscription_callback(const void *msgin) {
const std_msgs__msg__Int32 *msg = (const std_msgs__msg__Int32 *)msgin;
led_state = !led_state;
digitalWrite(24, led_state ? HIGH : LOW);
// Extract data for both motors
int raw_data = msg->data;
int target_position1 = clamp(raw_data % 10000, DXL_MINIMUM_POSITION_VALUE, DXL_MAXIMUM_POSITION_VALUE);
int target_position2 = clamp(raw_data / 10000, DXL_MINIMUM_POSITION_VALUE, DXL_MAXIMUM_POSITION_VALUE);
// Prepare goal position parameters
uint8_t param_goal_position1[4];
param_goal_position1[0] = DXL_LOBYTE(DXL_LOWORD(target_position1));
param_goal_position1[1] = DXL_HIBYTE(DXL_LOWORD(target_position1));
param_goal_position1[2] = DXL_LOBYTE(DXL_HIWORD(target_position1));
param_goal_position1[3] = DXL_HIBYTE(DXL_HIWORD(target_position1));
uint8_t param_goal_position2[4];
param_goal_position2[0] = DXL_LOBYTE(DXL_LOWORD(target_position2));
param_goal_position2[1] = DXL_HIBYTE(DXL_LOWORD(target_position2));
param_goal_position2[2] = DXL_LOBYTE(DXL_HIWORD(target_position2));
param_goal_position2[3] = DXL_HIBYTE(DXL_HIWORD(target_position2));
// Add parameters to sync write
groupSyncWrite->addParam(DXL1_ID, param_goal_position1);
groupSyncWrite->addParam(DXL2_ID, param_goal_position2);
// Write goal positions
groupSyncWrite->txPacket();
groupSyncWrite->clearParam();
// Read current positions
groupSyncRead->txRxPacket();
dxl_present_positions[0] = groupSyncRead->getData(DXL1_ID, ADDR_XL_PRESENT_POSITION, LEN_XL_PRESENT_POSITION);
dxl_present_positions[1] = groupSyncRead->getData(DXL2_ID, ADDR_XL_PRESENT_POSITION, LEN_XL_PRESENT_POSITION);
// Publish the current positions
pub_msg.data.data[0] = dxl_present_positions[0];
pub_msg.data.data[1] = dxl_present_positions[1];
RCSOFTCHECK(rcl_publish(&publisher, &pub_msg, NULL));
}
void setup() {
set_microros_transports();
pinMode(22, OUTPUT);
pinMode(23, OUTPUT);
pinMode(24, OUTPUT);
pinMode(25, OUTPUT);
delay(2000);
allocator = rcl_get_default_allocator();
RCCHECK(rclc_support_init(&support, 0, NULL, &allocator));
RCCHECK(rclc_node_init_default(&node, "micro_ros_arduino_node", "", &support));
RCCHECK(rclc_subscription_init_default(
&subscriber,
&node,
ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Int32),
"micro_ros_arduino_subscriber"));
RCCHECK(rclc_publisher_init_default(
&publisher,
&node,
ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Int32MultiArray),
"micro_ros_arduino_publisher"));
RCCHECK(rclc_executor_init(&executor, &support.context, 2, &allocator));
RCCHECK(rclc_executor_add_subscription(&executor, &subscriber, &sub_msg, &subscription_callback, ON_NEW_DATA));
portHandler = dynamixel::PortHandler::getPortHandler(DEVICENAME);
packetHandler = dynamixel::PacketHandler::getPacketHandler(PROTOCOL_VERSION);
groupSyncWrite = new dynamixel::GroupSyncWrite(portHandler, packetHandler, ADDR_XL_GOAL_POSITION, LEN_XL_GOAL_POSITION);
groupSyncRead = new dynamixel::GroupSyncRead(portHandler, packetHandler, ADDR_XL_PRESENT_POSITION, LEN_XL_PRESENT_POSITION);
if (!portHandler->openPort()) error_loop();
if (!portHandler->setBaudRate(BAUDRATE)) error_loop();
uint8_t dxl_error;
int dxl_comm_result;
for (int id : {DXL1_ID, DXL2_ID}) {
dxl_comm_result = packetHandler->write1ByteTxRx(portHandler, id, ADDR_XL_TORQUE_ENABLE, TORQUE_ENABLE, &dxl_error);
if (dxl_comm_result != COMM_SUCCESS || dxl_error != 0) error_loop();
groupSyncRead->addParam(id);
}
pub_msg.data.data = (int32_t *)malloc(2 * sizeof(int32_t));
pub_msg.data.size = 2;
pub_msg.data.capacity = 2;
}
void loop() {
RCCHECK(rclc_executor_spin_some(&executor, RCL_MS_TO_NS(100)));
delay(100);
}
현재 micro-ros 를 가지고 string 형이나 Int32multiArray 형식으로 msg 를 받는 것에 문제가 있어 간접적으로 모터 값을 받도록 했다.
30330203 과 같은 8자리의 int32 정수를 sub 하여 이를 4개씩 잘라서 사용하는 식으로 2개의 모터의 서로 다른 입력을 줄 수 있도록 하였다.
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import Int32
import math
class Pub(Node):
def __init__(self):
super().__init__('pub')
qos_profile = QoSProfile(depth=10)
self.publisher_ = self.create_publisher(Int32, '/micro_ros_arduino_subscriber', qos_profile)
timer_period = 0.1 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.angle1 = 0 # Initial angle for motor 1 in degrees
self.angle2 = 90 # Initial angle for motor 2 in degrees (90 degrees phase offset)
def timer_callback(self):
msg = Int32()
# Calculate sine values for two angles and scale to the range [1028, 3028]
motor1_input = int(1000 * math.sin(math.radians(self.angle1)) + 2028)
motor2_input = int(1000 * math.sin(math.radians(self.angle2)) + 2028)
# Combine motor1_input and motor2_input into a single value
combined_value = motor1_input * 10000 + motor2_input
# Set the message data
msg.data = combined_value
print(f"Publishing combined value: {combined_value} (Motor1: {motor1_input}, Motor2: {motor2_input})")
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing: Combined Value: {combined_value} (Motor1: {motor1_input}, Motor2: {motor2_input})')
# Increment angles and wrap around after 360 degrees
self.angle1 = (self.angle1 + 5) % 360
self.angle2 = (self.angle2 + 5) % 360
def main(args=None):
rclpy.init(args=args)
node = Pub()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
위에 코드는 ros2 에서 2개의 모터의 서로 다른 입력을 주기 위한 모터 값을 pub 하는 python 코드이다.
파일을 추가하고 그에 따라 setup.py 파일도 수정해준다. 그리고 아래 코드들을 실행한다.
cd ~/ros2_ws/
colcon build
source /opt/ros/foxy/setup.bash
source ~/ros2_ws/install/local_setup.bash
ros2 run py_pubsub motor2
이에 대한 영상 이다.
0 Comments