rospy(ROS Noetic)でのPubSubの基本[rospyチュートリアル] - えいあーる・れいの技術日記の続きです。
動作環境
前回と同様ROS NoeticのPython3.8.5環境(仮想環境なし)を前提にしています。
ウェブカメラのトピックは GitHub - Ar-Ray-code/movie_publisher: ROS1 package that publish of movie file or webcam を用いています。ros-noeticではuvc-cameraは使えないみたいです…
Subscribeのみ(表示はimshowのみ)
ウェブカメラの画像を受信してただ表示させるプログラムです。img_bgr = self.bridge.imgmsg_to_cv2(msg,"bgr8")
で画像をsensor_msgs/msg/Imageからnumpy/ndarray型(cv::Mat相当)に変換します。そして文字を埋め込んでimshowで表示させます。
#!/bin/python3 import rospy from sensor_msgs.msg import Image from cv_bridge import CvBridge import cv2 class hello_world: def __init__(self): self.bridge = CvBridge() cv2.namedWindow("window",cv2.WINDOW_AUTOSIZE) rospy.init_node('put_text_sub') rospy.Subscriber("camera/color/image_raw",Image,self.process_image) rospy.spin() def __del__(self): cv2.destroyAllWindows() def process_image(self,msg): try: img_bgr = self.bridge.imgmsg_to_cv2(msg,"bgr8") cv2.putText(img_rgb, 'Test', (150, 150), cv2.FONT_HERSHEY_PLAIN, 10, (0, 255, 255), 5, cv2.LINE_AA) cv2.imshow("window", img_rgb) cv2.waitKey(1) except Exception as err: print(err) if __name__=="__main__": try: hello_world() except rospy.ROSInterruptException as err: print(err)
SubscribeとPublish(表示はrqt_graph)
ウェブカメラの画像を受信してただ表示させるプログラムですが、再び送信します。まず、img_rgb = self.bridge.imgmsg_to_cv2(msg,"rgb8")
で画像をsensor_msgs/msg/Imageからnumpy/ndarray型(cv::Mat相当)に変換します。次に、文字を埋め込みます。そして、output_img = self.bridge.cv2_to_imgmsg(img_rgb,"rgb8")
でsensor_msgs/msg/Imageに戻して送信します。
この実行結果は、rqt_image_viewで確認することができます。
#!/bin/python3 import rospy from sensor_msgs.msg import Image from cv_bridge import CvBridge import cv2 class hello_world: def __init__(self): self.pub = rospy.Publisher("output_image",Image,queue_size=1) self.bridge = CvBridge() rospy.init_node('put_text_pubsub') rospy.Subscriber("camera/color/image_raw",Image,self.process_image) rospy.spin() def process_image(self,msg): try: img_rgb = self.bridge.imgmsg_to_cv2(msg,"rgb8") cv2.putText(img_rgb, 'Test', (150, 150), cv2.FONT_HERSHEY_PLAIN, 10, (0, 255, 255), 5, cv2.LINE_AA) output_img = self.bridge.cv2_to_imgmsg(img_rgb,"rgb8") self.pub.publish(output_img) except Exception as err: print(err) if __name__=="__main__": try: hello_world() except rospy.ROSInterruptException as err: print(err)