0편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][0] – ros2, gazebo 설치 및 예제 실행
1편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][1] – ros2 publisher subscriber c++, python 구현 및 실행
2편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][2] – ros2 와 아두이노 통신 (micro ros arduino), OpenCR, robotis, dynamixel 모터 개발 환경 구성 및 예제

1. 원하는 모터 값을 pub 하는 ros2 노드 구현

아두이노 코드는 아래와 같다

#include <micro_ros_arduino.h>
#include <DynamixelSDK.h>

#include <stdio.h>
#include <rcl/rcl.h>
#include <rcl/error_handling.h>
#include <rclc/rclc.h>
#include <rclc/executor.h>

// Include the required message types
#include <std_msgs/msg/int32.h>
#include <std_msgs/msg/int32_multi_array.h>

rcl_subscription_t subscriber;
rcl_publisher_t publisher;
std_msgs__msg__Int32 sub_msg;
std_msgs__msg__Int32MultiArray pub_msg;

rclc_executor_t executor;
rclc_support_t support;
rcl_allocator_t allocator;
rcl_node_t node;

#define LED_PIN 13

// Control Table Address (XL430)
#define ADDR_XL_TORQUE_ENABLE          64  // Torque Enable Address
#define ADDR_XL_GOAL_POSITION          116 // Goal Position Address
#define ADDR_XL_PRESENT_POSITION       132 // Present Position Address

// Data Byte Length
#define LEN_XL_GOAL_POSITION           4
#define LEN_XL_PRESENT_POSITION        4

// Protocol version
#define PROTOCOL_VERSION                2.0 // XL430 uses Protocol 2.0

// Default setting
#define DXL1_ID                         13   // Dynamixel#1 ID
#define DXL2_ID                         14   // Dynamixel#2 ID
#define BAUDRATE                        57600 // Baudrate
#define DEVICENAME                      "OpenCR_DXL_Port" // Serial Port

#define TORQUE_ENABLE                   1    // Value for enabling the torque
#define TORQUE_DISABLE                  0    // Value for disabling the torque
#define DXL_MINIMUM_POSITION_VALUE      0    // Minimum position value (in pulse)
#define DXL_MAXIMUM_POSITION_VALUE      4095 // Maximum position value (in pulse)
#define DXL_MOVING_STATUS_THRESHOLD     10   // Moving status threshold

#define RCCHECK(fn) { rcl_ret_t temp_rc = fn; if((temp_rc != RCL_RET_OK)){error_loop();}}
#define RCSOFTCHECK(fn) { rcl_ret_t temp_rc = fn; if((temp_rc != RCL_RET_OK)){}}

dynamixel::PortHandler *portHandler;
dynamixel::PacketHandler *packetHandler;
dynamixel::GroupSyncWrite *groupSyncWrite;
dynamixel::GroupSyncRead *groupSyncRead;

int32_t dxl_present_positions[2];

void error_loop() {
  while (1) {
    digitalWrite(LED_PIN, !digitalRead(LED_PIN));
    delay(100);
  }
}

// Helper to clamp values
int clamp(int value, int min_value, int max_value) {
  if (value < min_value) return min_value;
  if (value > max_value) return max_value;
  return value;
}

// Subscription Callback
void subscription_callback(const void *msgin) {  
  const std_msgs__msg__Int32 *msg = (const std_msgs__msg__Int32 *)msgin;

  // Clamp the received value within the allowed range
  int target_position = clamp(msg->data, DXL_MINIMUM_POSITION_VALUE, DXL_MAXIMUM_POSITION_VALUE);

  // Prepare goal position parameter
  uint8_t param_goal_position[4];
  param_goal_position[0] = DXL_LOBYTE(DXL_LOWORD(target_position));
  param_goal_position[1] = DXL_HIBYTE(DXL_LOWORD(target_position));
  param_goal_position[2] = DXL_LOBYTE(DXL_HIWORD(target_position));
  param_goal_position[3] = DXL_HIBYTE(DXL_HIWORD(target_position));

  // Add goal position for DXL1 and DXL2
  groupSyncWrite->addParam(DXL1_ID, param_goal_position);
  groupSyncWrite->addParam(DXL2_ID, param_goal_position);

  // Write goal positions
  groupSyncWrite->txPacket();
  groupSyncWrite->clearParam();

  // Read current positions
  groupSyncRead->txRxPacket();
  dxl_present_positions[0] = groupSyncRead->getData(DXL1_ID, ADDR_XL_PRESENT_POSITION, LEN_XL_PRESENT_POSITION);
  dxl_present_positions[1] = groupSyncRead->getData(DXL2_ID, ADDR_XL_PRESENT_POSITION, LEN_XL_PRESENT_POSITION);

  // Publish the current positions as an array
  pub_msg.data.data[0] = dxl_present_positions[0];
  pub_msg.data.data[1] = dxl_present_positions[1];
  RCSOFTCHECK(rcl_publish(&publisher, &pub_msg, NULL));
}

void setup() {
  // Setup ROS transport
  set_microros_transports();

  pinMode(LED_PIN, OUTPUT);
  digitalWrite(LED_PIN, HIGH);  

  delay(2000);

  allocator = rcl_get_default_allocator();

  // Initialize support
  RCCHECK(rclc_support_init(&support, 0, NULL, &allocator));

  // Create node
  RCCHECK(rclc_node_init_default(&node, "micro_ros_arduino_node", "", &support));

  // Create subscriber
  RCCHECK(rclc_subscription_init_default(
    &subscriber,
    &node,
    ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Int32),
    "micro_ros_arduino_subscriber"));

  // Create publisher
  RCCHECK(rclc_publisher_init_default(
    &publisher,
    &node,
    ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Int32MultiArray),
    "micro_ros_arduino_publisher"));

  // Create executor
  RCCHECK(rclc_executor_init(&executor, &support.context, 2, &allocator));
  RCCHECK(rclc_executor_add_subscription(&executor, &subscriber, &sub_msg, &subscription_callback, ON_NEW_DATA));

  // Initialize Dynamixel Communication
  portHandler = dynamixel::PortHandler::getPortHandler(DEVICENAME);
  packetHandler = dynamixel::PacketHandler::getPacketHandler(PROTOCOL_VERSION);

  groupSyncWrite = new dynamixel::GroupSyncWrite(portHandler, packetHandler, ADDR_XL_GOAL_POSITION, LEN_XL_GOAL_POSITION);
  groupSyncRead = new dynamixel::GroupSyncRead(portHandler, packetHandler, ADDR_XL_PRESENT_POSITION, LEN_XL_PRESENT_POSITION);

  if (!portHandler->openPort()) error_loop();
  if (!portHandler->setBaudRate(BAUDRATE)) error_loop();

  uint8_t dxl_error;
  int dxl_comm_result;

  // Enable motor torque
  for (int id : {DXL1_ID, DXL2_ID}) {
    dxl_comm_result = packetHandler->write1ByteTxRx(portHandler, id, ADDR_XL_TORQUE_ENABLE, TORQUE_ENABLE, &dxl_error);
    if (dxl_comm_result != COMM_SUCCESS || dxl_error != 0) error_loop();
    groupSyncRead->addParam(id);
  }

  // Initialize publisher message
  pub_msg.data.data = (int32_t *)malloc(2 * sizeof(int32_t)); // Allocate memory for two integers
  pub_msg.data.size = 2; // Array size
  pub_msg.data.capacity = 2; // Array capacity
}

void loop() {
  RCCHECK(rclc_executor_spin_some(&executor, RCL_MS_TO_NS(100)));
  delay(100); // Small delay to avoid busy loop
}

이제 motor angle 을 publish 하는 코드를 작성하자

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile

from std_msgs.msg import Int32
import math

class Pub(Node):

    def __init__(self):
        super().__init__('pub')
        qos_profile = QoSProfile(depth=10)
        self.publisher_ = self.create_publisher(Int32, '/micro_ros_arduino_subscriber', qos_profile)
        timer_period = 0.1  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.angle = 0  # Initial angle in degrees

    def timer_callback(self):
        msg = Int32()
        # Calculate sin(angle) and convert it to integer
        msg.data = int(1000 * math.sin(math.radians(self.angle))+ 2028)  # Scale to range [-1000, 1000]
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%d"' % msg.data)
        
        # Increment angle and wrap around after 360 degrees
        self.angle = (self.angle + 5) % 360


def main(args=None):
    rclpy.init(args=args)
    node = Pub()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        node.get_logger().info('Keyboard Interrupt (SIGINT)')
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

sin 각도를 pub 하는 코드이다. 코드를 작성한 후

image 94

setup.py 코드를 위와 같이 수정해준다. (entry_points 부분이 수정되었다.)
터미널을 켜서 아래 코드를 수행한다.

cd ~/ros2_ws/
colcon build
source /opt/ros/foxy/setup.bash
source ~/ros2_ws/install/local_setup.bash
ros2 run py_pubsub motor1

아래 영상 처럼 모터가 ros2 에서 pub 하는 값에 따라 잘 움직이는 것을 볼 수 있다.

2.

#include <micro_ros_arduino.h>
#include <DynamixelSDK.h>

#include <stdio.h>
#include <rcl/rcl.h>
#include <rcl/error_handling.h>
#include <rclc/rclc.h>
#include <rclc/executor.h>

// Include the required message types
#include <std_msgs/msg/int32.h>
#include <std_msgs/msg/int32_multi_array.h>

rcl_subscription_t subscriber;
rcl_publisher_t publisher;
std_msgs__msg__Int32 sub_msg;
std_msgs__msg__Int32MultiArray pub_msg;

rclc_executor_t executor;
rclc_support_t support;
rcl_allocator_t allocator;
rcl_node_t node;

#define LED_PIN 13

// Control Table Address (XL430)
#define ADDR_XL_TORQUE_ENABLE          64
#define ADDR_XL_GOAL_POSITION          116
#define ADDR_XL_PRESENT_POSITION       132

// Data Byte Length
#define LEN_XL_GOAL_POSITION           4
#define LEN_XL_PRESENT_POSITION        4

// Protocol version
#define PROTOCOL_VERSION                2.0

// Default setting
#define DXL1_ID                         13
#define DXL2_ID                         14
#define BAUDRATE                        57600
#define DEVICENAME                      "OpenCR_DXL_Port"

#define TORQUE_ENABLE                   1
#define TORQUE_DISABLE                  0
#define DXL_MINIMUM_POSITION_VALUE      0
#define DXL_MAXIMUM_POSITION_VALUE      4095
#define DXL_MOVING_STATUS_THRESHOLD     10

#define RCCHECK(fn) { rcl_ret_t temp_rc = fn; if((temp_rc != RCL_RET_OK)){error_loop();}}
#define RCSOFTCHECK(fn) { rcl_ret_t temp_rc = fn; if((temp_rc != RCL_RET_OK)){}}

dynamixel::PortHandler *portHandler;
dynamixel::PacketHandler *packetHandler;
dynamixel::GroupSyncWrite *groupSyncWrite;
dynamixel::GroupSyncRead *groupSyncRead;

int32_t dxl_present_positions[2];

void error_loop() {
  while (1) {
    digitalWrite(LED_PIN, !digitalRead(LED_PIN));
    delay(100);
  }
}

int clamp(int value, int min_value, int max_value) {
  if (value < min_value) return min_value;
  if (value > max_value) return max_value;
  return value;
}

bool led_state = false;

void subscription_callback(const void *msgin) {
  const std_msgs__msg__Int32 *msg = (const std_msgs__msg__Int32 *)msgin;

  led_state = !led_state;
  digitalWrite(24, led_state ? HIGH : LOW);

  // Extract data for both motors
  int raw_data = msg->data;
  int target_position1 = clamp(raw_data % 10000, DXL_MINIMUM_POSITION_VALUE, DXL_MAXIMUM_POSITION_VALUE);
  int target_position2 = clamp(raw_data / 10000, DXL_MINIMUM_POSITION_VALUE, DXL_MAXIMUM_POSITION_VALUE);

  // Prepare goal position parameters
  uint8_t param_goal_position1[4];
  param_goal_position1[0] = DXL_LOBYTE(DXL_LOWORD(target_position1));
  param_goal_position1[1] = DXL_HIBYTE(DXL_LOWORD(target_position1));
  param_goal_position1[2] = DXL_LOBYTE(DXL_HIWORD(target_position1));
  param_goal_position1[3] = DXL_HIBYTE(DXL_HIWORD(target_position1));

  uint8_t param_goal_position2[4];
  param_goal_position2[0] = DXL_LOBYTE(DXL_LOWORD(target_position2));
  param_goal_position2[1] = DXL_HIBYTE(DXL_LOWORD(target_position2));
  param_goal_position2[2] = DXL_LOBYTE(DXL_HIWORD(target_position2));
  param_goal_position2[3] = DXL_HIBYTE(DXL_HIWORD(target_position2));

  // Add parameters to sync write
  groupSyncWrite->addParam(DXL1_ID, param_goal_position1);
  groupSyncWrite->addParam(DXL2_ID, param_goal_position2);

  // Write goal positions
  groupSyncWrite->txPacket();
  groupSyncWrite->clearParam();

  // Read current positions
  groupSyncRead->txRxPacket();
  dxl_present_positions[0] = groupSyncRead->getData(DXL1_ID, ADDR_XL_PRESENT_POSITION, LEN_XL_PRESENT_POSITION);
  dxl_present_positions[1] = groupSyncRead->getData(DXL2_ID, ADDR_XL_PRESENT_POSITION, LEN_XL_PRESENT_POSITION);

  // Publish the current positions
  pub_msg.data.data[0] = dxl_present_positions[0];
  pub_msg.data.data[1] = dxl_present_positions[1];
  RCSOFTCHECK(rcl_publish(&publisher, &pub_msg, NULL));
}

void setup() {
  set_microros_transports();

  pinMode(22, OUTPUT);
  pinMode(23, OUTPUT);
  pinMode(24, OUTPUT);
  pinMode(25, OUTPUT);

  delay(2000);

  allocator = rcl_get_default_allocator();

  RCCHECK(rclc_support_init(&support, 0, NULL, &allocator));
  RCCHECK(rclc_node_init_default(&node, "micro_ros_arduino_node", "", &support));

  RCCHECK(rclc_subscription_init_default(
    &subscriber,
    &node,
    ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Int32),
    "micro_ros_arduino_subscriber"));

  RCCHECK(rclc_publisher_init_default(
    &publisher,
    &node,
    ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Int32MultiArray),
    "micro_ros_arduino_publisher"));

  RCCHECK(rclc_executor_init(&executor, &support.context, 2, &allocator));
  RCCHECK(rclc_executor_add_subscription(&executor, &subscriber, &sub_msg, &subscription_callback, ON_NEW_DATA));

  portHandler = dynamixel::PortHandler::getPortHandler(DEVICENAME);
  packetHandler = dynamixel::PacketHandler::getPacketHandler(PROTOCOL_VERSION);

  groupSyncWrite = new dynamixel::GroupSyncWrite(portHandler, packetHandler, ADDR_XL_GOAL_POSITION, LEN_XL_GOAL_POSITION);
  groupSyncRead = new dynamixel::GroupSyncRead(portHandler, packetHandler, ADDR_XL_PRESENT_POSITION, LEN_XL_PRESENT_POSITION);

  if (!portHandler->openPort()) error_loop();
  if (!portHandler->setBaudRate(BAUDRATE)) error_loop();

  uint8_t dxl_error;
  int dxl_comm_result;

  for (int id : {DXL1_ID, DXL2_ID}) {
    dxl_comm_result = packetHandler->write1ByteTxRx(portHandler, id, ADDR_XL_TORQUE_ENABLE, TORQUE_ENABLE, &dxl_error);
    if (dxl_comm_result != COMM_SUCCESS || dxl_error != 0) error_loop();
    groupSyncRead->addParam(id);
  }

  pub_msg.data.data = (int32_t *)malloc(2 * sizeof(int32_t));
  pub_msg.data.size = 2;
  pub_msg.data.capacity = 2;
}

void loop() {
  RCCHECK(rclc_executor_spin_some(&executor, RCL_MS_TO_NS(100)));
  delay(100);
}

현재 micro-ros 를 가지고 string 형이나 Int32multiArray 형식으로 msg 를 받는 것에 문제가 있어 간접적으로 모터 값을 받도록 했다.

30330203 과 같은 8자리의 int32 정수를 sub 하여 이를 4개씩 잘라서 사용하는 식으로 2개의 모터의 서로 다른 입력을 줄 수 있도록 하였다.

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile

from std_msgs.msg import Int32
import math

class Pub(Node):
    def __init__(self):
        super().__init__('pub')
        qos_profile = QoSProfile(depth=10)
        self.publisher_ = self.create_publisher(Int32, '/micro_ros_arduino_subscriber', qos_profile)
        timer_period = 0.1  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.angle1 = 0  # Initial angle for motor 1 in degrees
        self.angle2 = 90  # Initial angle for motor 2 in degrees (90 degrees phase offset)

    def timer_callback(self):
        msg = Int32()
        # Calculate sine values for two angles and scale to the range [1028, 3028]
        motor1_input = int(1000 * math.sin(math.radians(self.angle1)) + 2028)
        motor2_input = int(1000 * math.sin(math.radians(self.angle2)) + 2028)

        # Combine motor1_input and motor2_input into a single value
        combined_value = motor1_input * 10000 + motor2_input

        # Set the message data
        msg.data = combined_value
        print(f"Publishing combined value: {combined_value} (Motor1: {motor1_input}, Motor2: {motor2_input})")
        self.publisher_.publish(msg)
        self.get_logger().info(f'Publishing: Combined Value: {combined_value} (Motor1: {motor1_input}, Motor2: {motor2_input})')

        # Increment angles and wrap around after 360 degrees
        self.angle1 = (self.angle1 + 5) % 360
        self.angle2 = (self.angle2 + 5) % 360


def main(args=None):
    rclpy.init(args=args)
    node = Pub()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        node.get_logger().info('Keyboard Interrupt (SIGINT)')
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

위에 코드는 ros2 에서 2개의 모터의 서로 다른 입력을 주기 위한 모터 값을 pub 하는 python 코드이다.

image 95

파일을 추가하고 그에 따라 setup.py 파일도 수정해준다. 그리고 아래 코드들을 실행한다.

cd ~/ros2_ws/
colcon build
source /opt/ros/foxy/setup.bash
source ~/ros2_ws/install/local_setup.bash
ros2 run py_pubsub motor2

이에 대한 영상 이다.


0 Comments

Leave a Reply