0편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][0] – ros2, gazebo 설치 및 예제 실행
1편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][1] – ros2 publisher subscriber c++, python 구현 및 실행
2편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][2] – ros2 와 아두이노 통신 (micro ros arduino), OpenCR, robotis, dynamixel 모터 개발 환경 구성 및 예제
3편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ros2, gazebo][3] – ros2 와 아두이노(OpenCR)로 여러개의 dynamixel 모터 제어 및 엔코더 값 받기, robotis
4편 – [robot arm, manipulator 프로젝트][ubuntu 20.04, ROS2, gazebo][4] – ROS2 u2d2 를 활용한 모터 제어, dynamixel SDK tutorial
2편, 3편 생략 가능

1. 모터 2개 동시에 움직이기

3개의 코드가 존재한다.

  1. 각각의 모터를 어떤 값으로 움직일지 desired 각도를 pub 하는 코드
  2. pub 된 모터 각도를 sub 하여 식제 모터를 움직이는 코드
  3. 모터 엔코더로부터 모터의 현재 각도를 입력받아 pub 하는 코드
  4. 모터의 현재 각도를 sub 하여 실시간으로 plot 하는 코드
image 4

결국 전체적인 흐름도는 위와 같다.

ROS2 를 활용하여 manipulator 각 모터의 각도 입력 – U2D2 를 거쳐서 실제 dynamixel 모터에 각도 입력 – 실제 dynamixel 모터로부터 엔코더 각도 값을 피드백 – 각 모터의 각도값을 plot

1번 코드

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile

from std_msgs.msg import Int32MultiArray
import math

class Pub(Node):
    def __init__(self):
        super().__init__('pub')
        qos_profile = QoSProfile(depth=10)
        self.publisher_ = self.create_publisher(Int32MultiArray, '/cmd_positions', qos_profile)
        timer_period = 0.1  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.angle1 = 0  # Initial angle for motor 1 in degrees
        self.angle2 = 90  # Initial angle for motor 2 in degrees (90 degrees phase offset)

    def timer_callback(self):
        msg = Int32MultiArray()
        msg.data = []  # Initialize the data array

        # Calculate sine values for two angles and scale to the range [1028, 3028]
        motor1_input = int(1000 * math.sin(math.radians(self.angle1)) + 2028)
        motor2_input = int(100 * math.sin(math.radians(self.angle2)) + 2028)

        # Append motor values to the message
        msg.data.append(motor1_input)
        msg.data.append(motor2_input)

        # Publish the message
        print(f"Publishing values: Motor1: {motor1_input}, Motor2: {motor2_input}")
        self.publisher_.publish(msg)
        self.get_logger().info(f'Publishing: Motor1: {motor1_input}, Motor2: {motor2_input}')

        # Increment angles and wrap around after 360 degrees
        self.angle1 = (self.angle1 + 5) % 360
        self.angle2 = (self.angle2 + 5) % 360


def main(args=None):
    rclpy.init(args=args)
    node = Pub()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        node.get_logger().info('Keyboard Interrupt (SIGINT)')
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

\+ setup.py 에서 entry_points 추가하는 것 까먹지 말기

cd ~/ros2_ws/
colcon build
source /opt/ros/foxy/setup.bash
source ~/ros2_ws/install/local_setup.bash
ros2 run py_pubsub motor22
image 2

2. 번 3. 번 코드는 같이

#include <rclcpp/rclcpp.hpp>

#include "std_msgs/msg/int32_multi_array.hpp"

#include "dynamixel_sdk/dynamixel_sdk.h"

rclcpp::Node::SharedPtr node = nullptr;

using namespace dynamixel;

// Control table address

#define ADDR_OPERATING_MODE 11 // 제어모드

#define ADDR_TORQUE_ENABLE 64 // Torque on/off 제어

#define ADDR_GOAL_POSITION 116 // 위치 제어

#define ADDR_PRESENT_POSITION 132 // 현재 위치

// Protocol version

#define PROTOCOL_VERSION 2.0 // Default Protocol version of DYNAMIXEL X series.

// Default setting

#define BAUDRATE 57600 // Default Baudrate of DYNAMIXEL X series

#define DEVICE_NAME "/dev/ttyUSB0" // Port name

#define VALUE 2048 // 초기 목표 위치 (중간값)

#define OPERATING_VALUE 3 // 제어모드, 위치제어: 3

PortHandler *portHandler;

PacketHandler *packetHandler;

GroupSyncWrite *groupSyncWrite;

GroupSyncRead *groupSyncRead;

std::vector<uint8_t> motor_ids = {13, 14}; // 모터 ID 목록

rclcpp::Publisher<std_msgs::msg::Int32MultiArray>::SharedPtr position_publisher;

void publishCurrentPositions() {

std_msgs::msg::Int32MultiArray msg;

msg.data.clear();

// Read positions for all motors

if (groupSyncRead->txRxPacket() == COMM_SUCCESS) {

for (uint8_t id : motor_ids) {

if (groupSyncRead->isAvailable(id, ADDR_PRESENT_POSITION, 4)) {

int32_t position = groupSyncRead->getData(id, ADDR_PRESENT_POSITION, 4);

msg.data.push_back(position);

} else {

RCLCPP_ERROR(node->get_logger(), "Failed to get position data for ID %d", id);

return;

}

}

position_publisher->publish(msg);

RCLCPP_INFO(node->get_logger(), "Published current positions");

} else {

RCLCPP_ERROR(node->get_logger(), "Failed to read positions");

}

}

void topic_callback(const std_msgs::msg::Int32MultiArray::SharedPtr msg)

{

if (msg->data.size() != motor_ids.size()) {

RCLCPP_ERROR(node->get_logger(), "Received array does not match the number of motors.");

return;

}

groupSyncWrite->clearParam();

for (size_t i = 0; i < motor_ids.size(); i++) {

int position = msg->data[i];

// 위치 값을 클램핑

if (position < 0) position = 0;

if (position > 4095) position = 4095;

// 4바이트 데이터 준비

uint8_t param_goal_position[4];

param_goal_position[0] = DXL_LOBYTE(DXL_LOWORD(position));

param_goal_position[1] = DXL_HIBYTE(DXL_LOWORD(position));

param_goal_position[2] = DXL_LOBYTE(DXL_HIWORD(position));

param_goal_position[3] = DXL_HIBYTE(DXL_HIWORD(position));

// GroupSyncWrite에 데이터 추가

if (!groupSyncWrite->addParam(motor_ids[i], param_goal_position)) {

RCLCPP_ERROR(node->get_logger(), "Failed to add parameter for motor ID %d", motor_ids[i]);

return;

}

}

// GroupSyncWrite 패킷 전송

int dxl_comm_result = groupSyncWrite->txPacket();

if (dxl_comm_result != COMM_SUCCESS) {

RCLCPP_ERROR(node->get_logger(), "GroupSyncWrite failed: %s", packetHandler->getTxRxResult(dxl_comm_result));

} else {

RCLCPP_INFO(node->get_logger(), "Updated positions for all motors.");

publishCurrentPositions(); // Publish current positions after movement

}

}

int main(int argc, char **argv)

{

uint8_t dxl_error = 0;

rclcpp::init(argc, argv);

node = rclcpp::Node::make_shared("Dynamixel");

// 포트 핸들러 초기화

portHandler = dynamixel::PortHandler::getPortHandler(DEVICE_NAME);

packetHandler = dynamixel::PacketHandler::getPacketHandler(PROTOCOL_VERSION);

groupSyncWrite = new GroupSyncWrite(portHandler, packetHandler, ADDR_GOAL_POSITION, 4);

groupSyncRead = new GroupSyncRead(portHandler, packetHandler, ADDR_PRESENT_POSITION, 4);

// Add parameters for sync read

for (uint8_t id : motor_ids) {

if (!groupSyncRead->addParam(id)) {

RCLCPP_ERROR(node->get_logger(), "Failed to add parameter for GroupSyncRead for ID %d", id);

return -1;

}

}

// 포트 열기

if (!portHandler->openPort()) {

RCLCPP_ERROR(node->get_logger(), "Failed to open the port!");

return -1;

}

// 통신 속도 설정

if (!portHandler->setBaudRate(BAUDRATE)) {

RCLCPP_ERROR(node->get_logger(), "Failed to set the baudrate!");

return -1;

}

// 모터 초기화

for (uint8_t id : motor_ids) {

// 제어 모드 설정

packetHandler->write1ByteTxRx(portHandler, id, ADDR_OPERATING_MODE, OPERATING_VALUE, &dxl_error);

if (dxl_error != 0) {

RCLCPP_ERROR(node->get_logger(), "Failed to set operating mode for ID %d!", id);

return -1;

}

// 토크 활성화

packetHandler->write1ByteTxRx(portHandler, id, ADDR_TORQUE_ENABLE, 1, &dxl_error);

if (dxl_error != 0) {

RCLCPP_ERROR(node->get_logger(), "Failed to enable torque for ID %d!", id);

return -1;

}

// 초기 위치 설정

packetHandler->write4ByteTxRx(portHandler, id, ADDR_GOAL_POSITION, VALUE, &dxl_error);

}

RCLCPP_INFO(node->get_logger(), "All Dynamixels initialized in position control mode.");

// Create publisher for current positions

position_publisher = node->create_publisher<std_msgs::msg::Int32MultiArray>("/current_positions", 10);

// Create timer for periodic position publishing

auto timer = node->create_wall_timer(

std::chrono::milliseconds(100),

publishCurrentPositions

);

// Subscriber 생성

auto subscription = node->create_subscription<std_msgs::msg::Int32MultiArray>(

"/cmd_positions", 10, topic_callback);

// ROS 2 실행

rclcpp::spin(node);

// 정리 및 토크 비활성화

for (uint8_t id : motor_ids) {

packetHandler->write1ByteTxRx(portHandler, id, ADDR_TORQUE_ENABLE, 0, &dxl_error);

}

rclcpp::shutdown();

// 포트 닫기

portHandler->closePort();

delete groupSyncWrite;

delete groupSyncRead;

return 0;

}

cmakelists.txt 수정해주는 것 까먹지 말기

cd ~/ros2_ws/
colcon build
source /opt/ros/foxy/setup.bash
source ~/ros2_ws/install/local_setup.bash
ros2 run test_dynamixel test_move_pos2 /dev/ttyUSB0

4번 코드

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import Int32MultiArray

import matplotlib.pyplot as plt
from collections import deque
import numpy as np

class PlotCurrentPositions(Node):
    def __init__(self):
        super().__init__('plot_current_positions')
        
        # Create subscriber with QoS profile
        qos_profile = QoSProfile(depth=10)
        self.subscription = self.create_subscription(
            Int32MultiArray,
            '/current_positions',
            self.listener_callback,
            qos_profile)
        
        # Initialize plot
        plt.ion()  # Enable interactive mode
        self.fig, self.ax = plt.subplots()
        self.lines = []
        colors = ['b', 'r']  # Colors for each motor
        
        # Create deques with max length 1000 for each motor
        self.positions = [deque([0]*1000, maxlen=1000), deque([0]*1000, maxlen=1000)]
        self.time_steps = deque(range(1000), maxlen=1000)
        
        # Initialize lines for each motor
        for i in range(2):
            line, = self.ax.plot(list(self.time_steps), list(self.positions[i]), 
                               colors[i], label=f'Motor {i+1}')
            self.lines.append(line)
        
        # Set plot properties
        self.ax.set_xlabel('Time Steps')
        self.ax.set_ylabel('Position')
        self.ax.set_title('Motor Positions Over Time')
        self.ax.legend()
        self.ax.grid(True)
        
        # Set initial axis limits
        self.ax.set_xlim(0, 1000)
        self.ax.set_ylim(0, 4096)  # Full range of Dynamixel positions

    def listener_callback(self, msg):
        # Update position data
        for i, pos in enumerate(msg.data):
            self.positions[i].append(pos)
        
        # Update time steps
        self.time_steps.append(self.time_steps[-1] + 1)
        
        # Update plot data
        for i, line in enumerate(self.lines):
            x_data = list(self.time_steps)
            y_data = list(self.positions[i])
            if len(x_data) == len(y_data):  # Ensure data lengths match
                line.set_data(x_data, y_data)
        
        # Adjust x-axis limits to show latest data
        self.ax.set_xlim(self.time_steps[0], self.time_steps[-1])
        
        # Draw plot
        self.fig.canvas.draw()
        self.fig.canvas.flush_events()


def main(args=None):
    rclpy.init(args=args)
    plot_node = PlotCurrentPositions()
    
    try:
        rclpy.spin(plot_node)
    except KeyboardInterrupt:
        plot_node.get_logger().info('Keyboard Interrupt (SIGINT)')
    finally:
        plot_node.destroy_node()
        rclpy.shutdown()
        plt.ioff()
        plt.close()


if __name__ == '__main__':
    main()
cd ~/ros2_ws/
colcon build
source /opt/ros/foxy/setup.bash
source ~/ros2_ws/install/local_setup.bash
ros2 run py_pubsub plot_angle
image 3

결과 영상 (3dof manipulator 에 적용)

motor 입력 생성 코드 (위 영상과 다른 코드)
– 제어주기를 4ms 로 하여 위 영상보다 부드럽게 움직임

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile

from std_msgs.msg import Int32MultiArray
import math


class Pub(Node):
    def __init__(self):
        super().__init__('pub')
        qos_profile = QoSProfile(depth=10)
        self.publisher_ = self.create_publisher(
            Int32MultiArray, '/cmd_positions', qos_profile)
        self.time_period = 0.004  # 4ms control period
        self.timer = self.create_timer(self.time_period, self.timer_callback)
        self.operation_t = 0.0  # Operation time in seconds
        self.angle1 = 0  # Initial angle for motor 1 in degrees
        self.angle2 = 0  # Initial angle for motor 2 in degrees
        self.angle3 = 0

    def timer_callback(self):
        msg = Int32MultiArray()
        msg.data = []  # Initialize the data array

        # Calculate sine values using operation time
        # Scale to motor resolution (4096) and add offset (2048)
        freq = 0.5  # 0.5 Hz = 2 second period
        motor1_input = int(400 * math.sin(2*math.pi*freq*self.operation_t) + 2048)
        motor2_input = int(200 * math.sin(2*math.pi*freq*self.operation_t) + 2048)
        motor3_input = int(300 * math.sin(2*math.pi*freq*self.operation_t) + 2048)

        # Clamp values between 0 and 4095
        motor1_input = max(0, min(4095, motor1_input))
        motor2_input = max(0, min(4095, motor2_input))
        motor3_input = max(0, min(4095, motor3_input))

        # Append motor values to the message
        msg.data.append(motor1_input)
        msg.data.append(motor2_input)
        msg.data.append(motor3_input)

        # Publish the message
        self.publisher_.publish(msg)
        self.get_logger().info(
            f'Time: {self.operation_t:.3f}s, '
            f'Motors: {motor1_input}, {motor2_input}, {motor3_input}'
        )

        # Increment operation time
        self.operation_t += self.time_period


def main(args=None):
    rclpy.init(args=args)
    node = Pub()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        node.get_logger().info('Keyboard Interrupt (SIGINT)')
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

환경 구성 완료 후 실행해야할 코드들 정리 (터미널 3개)

  1. source /opt/ros/foxy/setup.bash
    source ~/ros2_ws/install/local_setup.bash
    ros2 run test_dynamixel test_move_pos3 /dev/ttyUSB0
  2. source /opt/ros/foxy/setup.bash
    source ~/ros2_ws/install/local_setup.bash
    ros2 run py_pubsub manipulator_test
  3. source /opt/ros/foxy/setup.bash
    source ~/ros2_ws/install/local_setup.bash
    ros2 run py_pubsub plot_angle


0 Comments

Leave a Reply