データポートによる通信 (push型)


入力コンポーネントと出力コンポーネントの通信(push型)
ここでは出力ポートを持つコンポーネントと,入力ポートを持つコンポーネントの通信を行ってみます.


コラム:pushとpullは何が違うの?

RTC Builderのデータ型を設定

RTC Builderに使えるデータ型を設定する必要があります.これはIDLというファイルに記述されており,OpenRTM-aistをインストールしたフォルダ内にあります.

設定はEclipseで行います.Eclipseのメニューの「ウィンドウ」>「設定」からダイアログを表示させ,「RTC Builder」の「Data Type」を選択し,ダイアログ内に

[OpenRTM-aistをインストールしたフォルダ]\rtm\idl

と入力します.写真の場合はOpenRTM-aistのフォルダが[C:\Program Files(x86)\OpenRTM-aist\1.0]の場合です.

Setting IDL Paths for RTC Builder

Setting IDL Paths for RTC Builder

まずこの準備をします.Eclipseは再起動した方がいいかも.

出力ポートを持つコンポーネント ConsoleIn

Consoleからデータを入力するからConsoleInですね.

RTC Builderでテンプレートを生成

タブ 変数 設定値
基本 Module name ConsoleIn
Module description Console In Component
Module version 1.0.0
Module vender あなたの名前
Module category TEST
Component type STATIC
Component Kind Data Flow
Component’s activity type PERIODIC
Number of maximum instance 1
Execution Type Periodic Execution Context
Execution Rate 1.0
Abstract 不要
RTC Type 不要
Output Project ConsoleIn
アクティビティ on_execute チェックボックスをONに
データポート

(OutPortプロファイル)

PortName out (ポートの名前)
DataType TimedLong (ポートのデータ型)
VarName out (ポートのデータを格納する変数名)
Disp. Position RIGHT (ポートの表示方向)
言語・環境 言語 C++, Java, Python

コードの編集

m_outOutというOutPortのオブジェクトがすでに宣言されて,m_outというTimedLong型の変数と関連付けられていますので,m_outにデータを書きこみ,m_outOutでデータをネットワークに送り出す,というのが手順です.

 
RTC::ReturnCode_t ConsoleIn::onExecute(RTC::UniqueId ec_id)
{
  std::cout << "Input number:" << std::endl;
  std::cin >> m_out.data;
  m_outOut.write();
  return RTC::RTC_OK;
}

ちなみに,outがTimedLongSeqのように,可変長配列であった場合は下記のようになります.
まず,length関数で長さを決定し,一つ一つ代入します.memcpyも使えるようです.

RTC::ReturnCode_t ConsoleSeqIn::onExecute(RTC::UniqueId ec_id)
{
  m_out.data.length(3);
  for(int i = 0;i < 3;i++) {
    long value;
    std::cout << "Input number:" << std::ends;
    std::cin >> value;
    std::cout << "Data is " << value << std::endl;
    m_out.data[i] = value;
  }
  m_outOut.write();
  return RTC::RTC_OK;
}
Javaはデータポートを実装すると,3つの変数が追加されます.今回,「out」という名前のデータポートを実装しましたので,

  • m_out_val : TimedLong
  • m_out : DataRef
  • m_outOut : DataPort

の3つの変数が追加されます.ここではDataRef型のm_outと,DataPort本体のm_outOutの二つだけを見れば大丈夫.

早速,onExecuteを見てみましょう.

    @Override
    protected ReturnCode_t onExecute(int ec_id) {
    	BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
    	String line;
    	try {
    		System.out.println("Input Number");
    		line = reader.readLine();
    		System.out.println("Number is " + line);
    	} catch (IOException ioex) {
    		System.out.println("Excepiton occured in onExecute.");
    		return RTC.ReturnCode_t.RTC_ERROR;
    	}
 
    	m_out.v.data = Integer.parseInt(line);
    	m_out.v.tm = new RTC.Time(0, 0);
    	m_outOut.write();
 
        return super.onExecute(ec_id);
    }

データの本体へは,m_out.vを使ってアクセスします.シーケンスの時はdataは配列になっています.C++版とちがうのは,タイムスタンプが自動的に付加されないので,自分でTimeクラスのオブジェクトを付加します.一度代入すればエラーは起きないので,onInitializeでTimeクラスの変数を代入しておくという方法もありますね.

outがTimedLongSeqのようなシーケンス(可変長配列)型だった場合は,扱い方が変わります.
先に配列をdataにセットします.毎回セットするのではオーバーヘッドが大きいので,onInitializeなどで一度セットしたら後はnewしない,など工夫してみてください.

    @Override
    protected ReturnCode_t onExecute(int ec_id) {
    	BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
    	String lineBuffer;
	    m_out.v.tm = new RTC.Time(0, 0);    	
    	m_out.v.data = new int[3];
    	for(int i = 0;i < 3;i++) {
	    	System.out.print("Input Number:");
	    	try {
	    		lineBuffer = reader.readLine();
	    	} catch (IOException ioe) {
	    		System.out.println(ioe);
	    		return RTC.ReturnCode_t.RTC_ERROR;
	    	}
	    	int value = Integer.parseInt(lineBuffer);
	    	System.out.println("Value = " + value);
	    	m_out.v.data[i] = value;
    	}
    	m_outOut.write();
        return super.onExecute(ec_id);
    }

Python版では,outという出力ポートを定義すると,データを格納する_d_out変数と,出力ポート本体の_outOut変数が追加されます._d_outにデータをセットして,出力ポートにwriteを指令する,この手順です.

  def onExecute(self, ec_id):
    print "Input number: "
    self._d_out.data = long(sys.stdin.readline())
    OpenRTM_aist.setTimestamp(self._d_out)
    print "Data is %d" % (self._d_out.data)
    self._outOut.write()		
    return RTC.RTC_OK

Python版でTimedLongSeqのようなシーケンス型を使った場合は,配列を用意して,append関数で要素を追加してから,_d_out.dataにデータをセットします.

  def onExecute(self, ec_id):
    OpenRTM_aist.setTimestamp(self._d_out)
    dataBuffer = [];
    print "Input 3 numbers"
    for v in range(3):
      print "Input number: "
      value = long(sys.stdin.readline())
      print "Data is %d" % (value)
      dataBuffer.append(value)
    self._d_out.data = dataBuffer;
    self._outOut.write()	
    return RTC.RTC_OK

入力ポートを持つコンポーネント

RtcTemplateでテンプレートを生成

タブ 変数 設定値
基本 Module name ConsoleOut
Module description Console Out Component
Module version 1.0.0
Module vender あなたの名前
Module category TEST
Component type STATIC
Component Kind Data Flow
Component’s activity type PERIODIC
Number of maximum instance 1
Execution Type Periodic Execution Context
Execution Rate 1.0
Abstract 不要
RTC Type 不要
Output Project ConsoleOut
アクティビティ on_execute チェックボックスをONに
データポート

(InPortプロファイル)

PortName in (ポートの名前)
DataType TimedLong (ポートのデータ型)
VarName in (ポートのデータを格納する変数名)
Disp. Position LEFT (ポートの表示方向)
言語・環境 言語 C++, Java, Python

ファイルの編集

onExecute関数を編集します.

inというデータポートを定義すると,m_inとm_inInというデータポート本体が出来ます.
InPort変数として宣言されているm_inInについて,データ更新があった場合はisNew関数の返り値がTRUEになりますので,read関数で読み込みます.
読み込むとm_in変数の内部にデータが取り込まれ,m_in.dataメンバからデータを読み込みます.シーケンスの場合はm_in.dataが配列のように取り扱えます.

RTC::ReturnCode_t ConsoleOut::onExecute(RTC::UniqueId ec_id)
{
  if (m_inIn.isNew())
  {
    m_inIn.read();
    std::cout << "Data is " << m_in.data << std::endl;
  }
  return RTC::RTC_OK;
}

シーケンスの場合は,length関数を引数なしで使った場合の返り値が配列長さになっています.

RTC::ReturnCode_t ConsoleSeqOut::onExecute(RTC::UniqueId ec_id)
{
  if(m_inIn.isNew()) {
    m_inIn.read();
    std::cout << m_in.data.length() << " datas received." << std::endl;
    for(int i = 0;i < m_in.data.length();i++) {
      std::cout << "Data is " << m_in.data[i] << std::endl;
    }
  }
  return RTC::RTC_OK;
}
Java版の場合,inという入力ポートを定義すると,m_in, m_in_val, m_inInという3つのメンバが出来ます.これも出力ポートと同じで,DataRefクラスのm_inと,データポート本体のm_inInの2つだけ意識すればOKです.

    @Override
    protected ReturnCode_t onExecute(int ec_id) {
    	if(m_inIn.isNew()) {
    		m_inIn.read();
    		System.out.println("Data is " + m_in.v.data); 
    	}
        return super.onExecute(ec_id);
    }

結論からいえば,入力ポートにデータが入ってきているかをisNew関数で監視し,read関数で読み込んだ時点で,m_in変数にデータが入っています.

シーケンスの場合は以下のようになります.配列の扱いがJava的ですから,特に違和感はありません.

    @Override
    protected ReturnCode_t onExecute(int ec_id) {
        if(m_inIn.isNew()) {
            m_inIn.read();
            System.out.println(m_in.v.data.length + " data received.");
            for(int i = 0;i < m_in.v.data.length;i++) {
                System.out.println("Data is " + m_in.v.data[i]);
    	    }
        }
        return super.onExecute(ec_id);
    }
\Python版で入力ポートの扱いは少し違うようです.
入力ポート用のデータ変数_d_inが定義されますが,入力ポート本体の_inInのみを使い,readで読みだすのですが,その帰り値が受け取った値になっています.これをローカル変数で受けてから,データを取り扱うようです.

  def onExecute(self, ec_id):
    if self._inIn.isNew():
      data = self._inIn.read()
      print "Received: ", data.data
    return RTC.RTC_OK

シーケンスを使っても,Pythonの機能で違和感なくコーディング出来ます.

  def onExecute(self, ec_id):
    if self._inIn.isNew():
      dataBuffer = self._inIn.read()
      print "Receive %d datas" % len(dataBuffer.data)
      for v in dataBuffer.data:
        print "Data is %d" % (v)
 
    return RTC.RTC_OK

実行

ここでrtc.confを設定し(corba.nameservers:の設定のみで十分.localhost:2809).Nameserverを実行してから,ConsoleIn,ConsoleOutの両方を実行します.
これで両方のコンソールがRTSystem Editorから見ることが出来るはずです.

RT System Editorによる接続

次に両方のコンポーネントの接続を行います.

ファイル→「Open New System Editor」を選択し,SystemDiagramビューを開きます.このビューに両方のコンポーネントをドラッグ&ドロップすると,コンポーネントがビジュアル化されます.

Drag & Drop

Drag & Drop RTC, then RTC is visualized.


次に二つのコンポーネントについている凸と凹をドラッグ&ドロップで接続します.接続した瞬間にダイアログが立ち上がります.1.0からはここでひと手間.以前からここで接続のプロパティを選択するのですが,必ずここでpushを選びます.
Select Dataflow type

Select Dataflow type


これで接続完了♪
RTCs are connected

RTCs are connected

この後に両方のコンポーネントをアクティブ化すると,ConsoleInコンポーネントのコンソール画面に”Input
number”の文字が出ます.

数字を入力して,Enterを入力すると,ConsoleOutの方に”Data is [あなたが入力した数字]”が表示されます.

まとめ

いかがですか?これでコンポーネント同士のプログラム接続が可能になりました.ようやくRTCの存在価値に近づいてきた気がしませんか?
TimedLong型の例
C++: ConsoleOut
C++: ConsoleIn
Java: ConsoleInOutJ
Python: ConsoleInOutPy

TimedLongSeq型の例
C++: ConsoleSeqInOut
Java: ConsoleSeqInOutJ
Python: ConsoleSeqInOutPy
次回からはコンポーネントをRT System Editorでアクティブ化&接続するのではなく,プログラムから操作が出来るように練習しましょう.

コラム

subscription_typeについて

接続を確立する際に出てくるダイアログで設定できる,subscription_typeは結構大事なのでここで記述しておく.

Select Subscription Type

Select Subscription Type


push型接続を選ぶと,subscription_typeが選べる.選択肢としては,以下の3つがある.

new

OutPort::writeを呼び出した時点で,データがCORBAオブジェクトを使ってInPort側のプロセスに送信される.writeメソッドが変えるまでに時間を要する(オーバーヘッド).

flush

送信側RTCのon_executeメソッドが終わった時点で,送信バッファのデータがすべてInPort側に送られる.とりあえずonExecuteメソッドの内部を速やかに終わらせることが出来る.通常はこれか.

periodic

周期的に送信する.これは送信側RTCと同期がとれない.push rateを設定すれば,送信するスピードを変えられるので便利だが,チューニングするパラメータが増えるので,僕は使いにくいと思っている.