えいあーるれいの技術日記

ROS2やM5 Stack、Ubuntuについて書いています

Launchで実行順を調整したりイベント管理する(ROS2)

ROS2で複数ノードを起動するときにはlaunchファイルを使用することができますが、そのままの状態では実行順は保証されません。

これだと、他のノードと一緒にシミュレータを動かしたりサーバを動かしたりするLaunchを書いたときに、まだサーバ等などが立ち上がりきっていないままの状態でサーバ前提のノードが動こうとしてしまいます。


それを防ぐための機構として、launchにはevent_handlerが用意されています。

以下の資料(とコード)を読めば全て分かりますが、なんか色々混ざっているので自分なりにサンプルを作りました。

Using event handlers — ROS 2 Documentation: Humble documentation


コードと解説

以下のlaunchは、次の順序で動いています。

  1. talker_listener.launch.pyが動く
  2. 1 の3秒後にxeyesが立ち上がる
  3. xeyesを終了(❌ボタンを押す)したら xeyes exited というLogInfoとともにturtlesimが立ち上がる
  4. turtlesimが終了(❌ボタンを押す)したら、 turtlesim_node exited という文字が出て 1 などの残っているノードらも一緒にシャットダウンする(→ 5 へ強制遷移)
  5. 全てのノードが終了したら、 Shutdown という文字が出る。


gist.github.com


説明

talker_listener.launch.pyの立ち上げ

時間をカウントしてくれるlaunchファイルを立ち上げます。

get_package_share_directory でlaunchファイルまでの絶対パスを記述することなくパス通しができます。

launch_share_dir = os.path.join(get_package_share_directory('demo_nodes_cpp'), 'launch')
talker_listener_launch = IncludeLaunchDescription(
    launch_description_source=os.path.join(launch_share_dir, 'topics/talker_listener.launch.py')
)

xeyesの立ち上げ

次に、xeyesを立ち上げる部分についてです。

launchの開始直後に立ち上げずに3秒遅らせてみましょう。


xeyesの起動には、ExecuteProcess を使用します。

xeyesはROS2のノードではありませんが、 ExecuteProcess を使うことでシェルコマンドを直接叩けます。

xeyes_cmd = ExecuteProcess(
    cmd=['xeyes']
)


次に、 TimerAction を使用して先ほどの xeyes_cmd を遅延させてみます。

xeyes_delay = TimerAction(
    period=3.0,
    actions=[xeyes_cmd]
)

period で秒数、actions内に対象となるアクションを記述していきます。

配列になっているので、複数記述できます。


xeyesの終了後の挙動

launchには特定のプロセスが終了したことをトリガーにした挙動も宣言できます。

ここでは、xeyesの❌ボタンの押下をトリガーにしてみます。

終了をトリガとするアクションには OnProcessExitOnExecutionCompleteも可)を使用します。

turtlesim_after_xeyes = RegisterEventHandler(
   OnProcessExit(
        target_action=xeyes_cmd,
        on_exit=[
            LogInfo(msg="==== xeyes exited ===="),
            turtlesim_node
        ]
    )
)


turtlesim_nodeの開始時・終了時処理

OnProcessStart はプロセスの開始時、 OnProcessExit はプロセスの終了時に呼ばれます。

on_start(OnProcessStart)on_exit(OnProcessExit) にそれぞれ起動させたいアクションを記述します。

LogInfo はROSのログに INFO として出力させるためのアクションです。

EmitEventlaunch.events クラスにあるイベントをトリガするためのアクションです。ここでは、Turtlesimだけkillしたとき(例:Turtlesimの❌ボタンを押した時)にLaunch自体がシャットダウンするようになります。

Shutdown 処理がない場合は、Turtlesimの❌ボタンを押しただけではノードは止まらず、talker_listener.launch.pyは動き続けます。

start_turtlesim_action = RegisterEventHandler(
    OnProcessStart(
        target_action=turtlesim_node,
        on_start=[
            LogInfo(msg="==== turtlesim_node started ====")
        ]
    )
)
end_turtlesim_action = RegisterEventHandler(
    OnProcessExit(
        target_action=turtlesim_node,
        on_exit=[
            LogInfo(msg="==== turtlesim_node exited ===="),
            EmitEvent(event=Shutdown(
                reason='turtlesim_node exited'
            ))
        ]
    )
)


入出力で駆動する

OnProcessIO は指定したアクションの入出力がトリガになるイベントです。

stderrもキャッチできるので、エラーが出た時に外部プロセスがそれを検知したりできます。

ここでは、Turtlesimのログをそのままオウム返しします。

turtlesim_node_io = RegisterEventHandler(
    OnProcessIO(
        target_action=turtlesim_node,
       on_stdout=lambda event: LogInfo(
               msg='Spawn request says "{}"'.format(
               event.text.decode().strip())
           ),
       on_stderr=lambda event: LogInfo(
                msg='Spawn request says "{}"'.format(
                event.text.decode().strip())
        ),
    )
)


LaunchDescriptionに登録

最後にLaunchDescriptionに登録します。

ld = LaunchDescription()
ld.add_action(talker_listener_launch)
ld.add_action(xeyes_delay)

# ==== turtlesim_node ====
ld.add_action(turtlesim_after_xeyes)
ld.add_action(start_turtlesim_action)
ld.add_action(turtlesim_node_io)
ld.add_action(end_turtlesim_action)
# ==== end turtlesim_node ====

ld.add_action(shutdown_action)

return ld


launchファイルは結構オプションがあるみたいで、使用するならちゃんとリファレンス(というよりコード)を読んでおきたいですね。