はじめに anchor.png

様々なアプリケーションにおいてDYNAMIXELでつまづかないよう、最低限運転できるところまでの流れを本ページにまとめた。PCの操作やOSについても説明をしなくてはならない事も多々あるが、ここでは「ひとまず」動かせるところまでを一気に紹介する事で、分かった気になってもらう事に主眼を置いた。
そのような中においても多少なりともTipをちりばめてあるので、最終的にはそれらを踏まえたシステムを構築して欲しい。

縦に長くなって閲覧しにくいかもしれないが予めご了承の程。また別ページへのリンクをクリックするとブラウザ上ではページが切り替わるが、何度も往来しているうちに元に戻るのが億劫になる事がある。リンク先を参照する際は「Ctrl」キーを押しながらクリックして新規タブで開けば、多少なりとも閲覧効率が良くなるかも知れない。

※ここに記述された内容は断り無く更新される。

Page Top

ハードウェアの準備 anchor.png

ここではWindows PCからDYNAMIXELを動かす方法のみ紹介する。それ以外の手段で運用するケースはいずれ別のページで。

Page Top

PC anchor.png

ARM版ではないWindows 10がインストールされており、USBポートが装備されたPCを用意する。USBポートは後述のUSB I/Fを接続するためにも必須の装備となる。

Page Top

USB I/F anchor.png

DYNAMIXELは主に2種類のI/Fが規定され、3ピンコネクタ版を俗にTTL版、4ピンコネクタ版をRS-485版と呼んでおり、電気的には相互互換性は無い。ほとんどのDYNAMIXELにはいずれかのタイプのコネクタが2個装備されている。

まずこの2種類のI/Fのコネクタを紹介する。

  • TTL
    Pats NameJST Parts Number
    基板用ヘッダーS3B-EH
    ハウジングEHR-3
    ターミナルSEH-00x
    B3B-EH.png
    端子番号信号名
    1GND電源のマイナス側
    シリアル通信のGNDと共有
    2VDD電源のプラス側
    大半のDYNAMIXELは12V
    3TTL Signalシリアル通信の信号
    双方向
  • RS-485
    Pats NameJST Parts Number
    基板用ヘッダーS4B-EH
    ハウジングEHR-4
    ターミナルSEH-00x
    B4B-EH.png
    端子番号信号名
    1GND電源のマイナス側
    シリアル通信のGNDと共有
    2VDD電源のプラス側
    大半のDYNAMIXELはDC12V
    3RS-485 D+シリアル通信のプラス側差動信号
    双方向
    4RS-485 D-シリアル通信のマイナス側差動信号
    双方向

紹介したTTL及びRS-485の2種類のI/Fは一般的なPCには標準装備されていないため、USBポートを介して増設するための装置が用意されている。複数のI/Fに対応していたり、いずれか1つのI/Fにのみ対応していたり、電源の供給を行えたりとラインナップが複数あるため、以下に現行品のみの比較表を示す。

USB2RS485 dongleUSB2TTL dongleUSB2DXIF dongleUSB2DXIFStarter Kit AStarter Kit BPicoSHIELDDXHUBU2D2
www.besttechnology.co.jp_e00267e5f30aeb4b7bdf.png
www.besttechnology.co.jp_p00268p5f30af2505632.png
www.besttechnology.co.jp_n00269n5f30af0648685.png
www.besttechnology.co.jp_k00291k61ff36e21ffa8.png
www.besttechnology.co.jp_k00264k62d7c3f745615.png
www.besttechnology.co.jp_h00265h62d7fbc6656a1.png
www.besttechnology.co.jp_i00297i640ec82d22466.png
www.besttechnology.co.jp_a00255a5dcac2c8eed34.png
www.besttechnology.co.jp_k00208k5a9cf31f42b33.png
ManufactureBestTechnologyROBOTIS
USBUSB2.0 FS
USB Type-A
USB2.0 FS
USB Type-C
USB2.0 FS
USB micro-B
USB2.0 FS
USB Type-C
USB2.0 FS
micro USB Type-B
USB2.0 HS
USB Type-C
USB2.0 HS
micro USB Type-B
FTDI ChipFT234XFT231XSFT234XRP2040FT232H
Max DTR Rate [Mbps]33126
DYNAMIXEL I/FRS-485 x1TTL x1RS-485 x1, TTL x1RS-485 x1, TTL x1RS-485 x5, TTL x5RS-485 x4, TTL x4RS-485 x1RS-485 x6, TTL x6RS-485 x1, TTL x2
IsolationYesNoYesNo
Dimension [mm]43.2x13.0x8.518.0x12.7x940x4051x23.552x33x1548x18x14.6
Other spec.SmallTinyPower distributionPlastic enclosure
Software compatibilityDYNAMIXEL Wizard 2.0, R+2.0, DXLIB, DX2LIB, DYNAMIXEL SDK

ここで紹介していない古いI/F製品は概ね旧来のDYNAMIXELを前提としており、装備されているコネクタがmolex社製のものが主である。もちろんConvertible Cableを仲介させる事で現行モデルのDYNAMIXELにも接続できるので、持っているのであれば買い直す必要は無い。

Page Top

DYNAMIXEL anchor.png

dxl_series.png

現時点ではAX・MX・X・Pシリーズから選定する事になるが、シリーズごとの大雑把な特徴を紹介しておく。

  • AXシリーズ
    BIOLOIDというロボットキットにかつて同梱されており、12V・TTL・molex社コネクタ・コアドモータ・電流センサ非搭載の廉価モデル。単体で扱おうとすると取り付けが面倒。今後ファームウェアが更新されないので、新規設計の場合はXL/XCシリーズを強く推奨。
  • MXシリーズ
    絶版のRX及びEXシリーズとの形状互換を謳った後継機で、12V・TTL/RS-485・molex社コネクタ・コアド/maxon社コアレスモータ・電流センサ搭載/非搭載。トルクによって4種類の形状(12/28/64/106)がある。MX-12Wを除きファームウェアを更新する事でXシリーズと同等に扱えるようになるが、新規設計の場合はXシリーズを推奨。
  • Xシリーズ
    • XL-320
      ROBOTIS MINI Kitに同梱されており、7.4V・TTL・molex社2mmピッチコネクタ・コアドモータ・電流センサ非搭載。現行モデル。
    • XL330
      5V・TTL・JST社コネクタ・コアドモータ・電流センサ搭載。現行モデル。
    • XC330
      5V/12V・TTL・JST社コネクタ・コアレスモータ・電流センサ搭載。現行モデル。
    • XL430
      12V・TTL・JST社コネクタ・コアドモータ・電流センサ非搭載で廉価モデルという位置付け。2軸を内蔵した2XLもある。現行モデル。
    • XC430
      12V・TTL・JST社コネクタ・コアレスモータ・電流センサ非搭載で。2軸を内蔵した2XCもある。現行モデル。
    • XM430/XM540
      12V・TTL/RS-485・JST社コネクタ・コアレスモータ・電流センサ搭載で、トルクによって2種類の形状(430/540)がある。スタンダードな現行モデル。
    • XH430/XH540
      12/24V・TTL/RS-485・JST社コネクタ・maxon社コアレスモータ・電流センサ搭載で、トルクによって2種類の形状(430/540)がある。現行モデル。
    • XD430/XD540
      12V・RS-485・JST社コネクタ・maxon社コアレスモータ・高耐久・電流センサ搭載で、トルクによって2種類の形状(430/540)がある。現行モデル。
    • XW430/XW540
      12V・RS-485・防水コネクタ・maxon社コアレスモータ・電流センサ搭載。IP68に対応。現行モデル。
  • Pシリーズ
    24V・RS-485・JST社コネクタ・コアレス/ブラシレスモータ・電流センサ搭載・高分解能・低バックラッシュ・高トルク。3種類の形状があるのとかなり高価。唯一定格トルクを謳うハイエンド向け現行モデル。

MXとXシリーズのストールトルクと無負荷最大回転数をモデルごとにプロットしたグラフを紹介しておく。

[Upload]

アプリケーションに要求されるトルクで選定するのが一般的なのだが、DYNAMIXELの大半のモデルは定格トルクではなくストールトルクを謳っている。ストールトルクは物理的にこの値を超えて出力されることは無い値(瞬時最大トルクに近い)であり、連続的に使って良いという意味では無い。

なおここではあえてXシリーズ以降を前提とするので、AXやMXシリーズまたは廃盤品を使用する場合は差異を各自で深掘りした上で適宜読み替えてもらうものとする。また廃盤品を含む全ラインナップの一覧はこちらに掲載した。

Page Top

各装置の接続 anchor.png

各USB I/Fを使用した場合の実態配線を紹介する。
いずれも複数のDYNAMIXELのコネクタをお互いに並列接続し、更にそこへUSB I/Fにて増設されるTTLないしRS-485 I/Fの信号と外部電源を接続する事が主な目的である。

なおmolex社のコネクタを装備したDYNAMIXELを使用する場合はJST社とmoelx社のコネクタを変換するConvertible Cableが必要だが、ここではXシリーズを前提で書き進めるので詳説しない。

Page Top
DXHUBの場合 anchor.png

3ピンのTTLと4ピンのRS-485 I/Fのコネクタが複数備わっている。更にDYNAMIXELへ電源を分配供給する機能があり、その電源の入力用コネクタが装備されている。各装置をDXHUBへ接続すると基本的に接続が完結してしまう。

70&, E148_DXL_connect.png

同一電源で構わないのであればTTLとRS-485 I/Fを装備したDYNAMIXELを同時に運用できる。

Page Top
USB2RS485 dongleの場合 anchor.png

4ピンのRS-485 I/Fのコネクタが1つのみ装備されており、DYNAMIXELへの電源の供給機能は無い。別途DXSharingBoard(RS485)等の分配基板を用意する事でDYNAMIXELへの電源の供給と複数台のDYNAMIXELの接続ができる。

rs485dongle_dxl_sharingboard_connect.png

この例ではPCとDYNAMIXEL間の距離はRobot Cable X4Pに支配されるが、USBドングルとPC間の接続を市販のUSB延長ケーブル等で中継する事で延長する事ができる。
更に正しく処置したRS-485ケーブルを作成する事で、USBドングルとDYNAMIXEL間の距離を延長できる。

Page Top
USB2TTL dongleの場合 anchor.png

3ピンのTTL I/Fのコネクタが1つのみ装備されており、DYNAMIXELへの電源の供給機能は無い。別途DXSharingBoard(TTL)等の分配基板を用意する事でDYNAMIXELへの電源の供給と複数台のDYNAMIXELの接続ができる。

ttldongle_dxl_sharingboard_connect.png

この例ではPCとDYNAMIXEL間の距離はRobot Cable X3Pに支配されるが、USBドングルとPC間の接続を市販のUSB延長ケーブル等で中継する事で延長する事ができる。

Page Top
USB2DXIF dongleの場合 anchor.png

USB2RS485 dongleとUSB2TTL dongleの機能を合わせ持っており、3ピンのTTL I/Fと4ピンのRS-485 I/Fのコネクタがそれぞれ1つずつ装備されている。
そもそもUSB2RS485 dongleとUSB2TTL dongleとUSB2DXIF dongleはコネクタの実装状況が異なる以外は全く同じ代物であるので、USB2RS485 dongleないしUSB2TTL dongleを参考に接続が行える。

E160B(DUAL)_connect.png
Page Top
U2D2の場合 anchor.png

4ピンのRS-485 I/Fのコネクタが1つ、3ピンのTTL I/Fのコネクタが2つ装備されており、DYNAMIXELへの電源の供給機能は無い。別途U2D2 PHBやDXSharingBoard等の分配基板を用意する事でDYNAMIXELへの電源の供給と複数台のDYNAMIXELの接続ができる。

u2d2phb.png

TTLとRS-485 I/Fを同時に運用できる。
なおU2D2 PHBの電源供給用のジャックはROBOTIS社のキットに同梱されるSMPSのサイズに合わせてあり、日本国内で多く流通するものには適合しない。

Page Top

留意事項 anchor.png

机上では動作していたものが、実際に運用を始めた途端に問題が生じる事は良くある。DYNAMIXELを使用したアプリケーションを設計するにあたっての留意事項をいくつか紹介する。

Page Top
DYNAMIXELのI/FとRobot Cable anchor.png

TTL I/Fのシリアル信号は極めて微弱かつ脆弱で、距離を延長する事ができないまでか、DYNAMIXELの電気回路を容易に故障させる可能性すらある。弊社ではTTL I/Fの選択肢しかないモデルを除き、堅牢なシステムにおいてはRS-485 I/Fを搭載したモデルを強く推奨する(モデル名のサフィックスに「-R」と記載)。

また単体製品及びDYNAMIXELに同梱されているRobot Cableはケーブルを製作せずともすぐに接続できる事を念頭に用意されており、一般的に考え得る電気的な問題を解消できるものでは無い。
特にRS-485 I/Fの実力を発揮させるためには既成のRobot Cableは適さないため、電源容量と延長距離に対応したケーブルの製作を推奨する。

RS-485_IdealCable.png

RS-485の終端抵抗の処置も行っておきたいところだ。

Page Top
電源装置 anchor.png

先の図では電源装置をSMPS等と表記しているが、任意電圧に設定できる電源装置やバッテリを負荷が要求する電圧と電流に合わせて用意する。しかしDYNAMIXELに内蔵されたモータの回生電力によって電圧が上昇してしまう事までは注意が行き届かないと思う。特に固定出力の一般的なスイッチングACアダプタには出力電圧と電流は規定されているが、出力側の耐圧やシンク能力までを明記しているものはまず無い。少なくとも誘導性負荷に対応しているものを選定しないと容易に故障する。

また電源からの距離をかせぐための延長ケーブルは、平行線ではなくツイストペアケーブル(より対線)を使うべきだ。

Page Top
電気的絶縁 anchor.png

回路上PC・USBポート・USB I/F・DYNAMIXELの全てのGNDが共通になっている必要があるが、DYNAMIXEL側で電流サージが発生したりPCとDYNAMIXELに供給されている電源自体に大きな電位差があったりすると、GNDの電位が共通である事による問題が生じる場合がある。
大抵のPCのUSBポートには過電流保護機能が備わっているので、コネクタの抜き差し時に発生する電流サージ程度には耐性がある。しかしモータから生じるサージとなると、保護回路では太刀打ちできないレベルのものが発生する事があり得る。またUSBポートの過電流保護機能が働くとUSBポートが一時的に機能を失い、それを想定していないプログラムやOSであれば容易に暴走する。
更に各装置に供給される電源の電位差はサージ過電流以上に厄介で、瞬間的なものではなく定常的に過電流状態が維持されてしまい、あらゆる装置の永久破壊につながる。
いずれの問題にも幾度となく遭遇し、壊したPCやI/Fが何台ある事か。。。

そういった諸問題を解消する目的で、USB2RS485 dongle・USB2TTL dongle・DXHUBはいずれもUSBポートとDYNAMIXEL用I/F及び電源の間を電気的に絶縁している。

なお以後の解説では接続の容易さを買って絶縁機能を持ったDXHUBを使用する。

Page Top

動作テストと設定変更 anchor.png

複数台のDYNAMIXELの運用は理解が進んでから行う事なので、まずは1台のDYNAMIXELのみを使って最低限必要な操作方法を習得する。

Page Top

DYNAMIXELについて知っておくべき事 anchor.png

DYNAMIXELに関係するID番号・通信速度・モデル番号・コントロールテーブル・ファームウェアといったキーワードがあるが、大半はDYNAMIXELの外観から判別できない内部情報であり、後述するPC用のツールを用いる事で初めて認識できる。

  • ID番号
    通信プロトコルにおいて個々のDYNAMIXELの識別を行う際に0~252,254の数値を用いており、ID番号と呼ぶ。
    出荷時および完全初期化時のID番号は一部を除き1。
  • 通信速度
    本来であれば最高速度で動作していれば良いのだが、電気的だったりやハード的な制約等を回避するため任意に設定できる。
    出荷時および完全初期化時の通信速度は一部を除き57600[bps]。
  • モデル番号
    モデル名(製品名のサフィックスにある「-T」と「-R」を除いた部分)に紐付く16ビット幅の値。
    全てのモデルが全く同じ機能を持っているものでは無いため、それを判別するためにモデル番号を用いる。
  • コントロールテーブル
    様々な情報(アイテム)をメモリ上に保持しており、それらを一覧にしたものをコントロールテーブルと呼ぶ。モデルによってアイテムの割り当て先(アドレス)が異なっていたり機能が異なっていたりする。
    前述のID番号・通信速度・モデル番号はコントロールテーブル上に割り当てられており、通信プロトコルは主にコントロールテーブルへのアクセス手段を提供する。
  • ファームウェア
    DYNAMIXELに内蔵されているマイコンの制御ソフトウェアをファームウェアと呼ぶ。そのファームウェアは時折更新される事があり、後述のツールを用いて必要に応じてDYNAMIXELに適用する事ができる。
Page Top

ハードウェアの準備 anchor.png

電源・電源ケーブル・USBケーブルは各製品に標準添付されていないため、必要に応じて別途調達する。

接続は先に紹介したDXHUBの場合の通りとなるが、初期設定の場合は混乱を避けるため1台のみのDYNAMIXELを接続する。
DXHUBをPCのUSBポートに接続してデバイスドライバのインストールを行った後に、こちらの情報を元に仮想COMポートの設定を予め変更しておく事を推奨する。
またDXHUBの裏側が不用意に何かに触れてショートすると取り返しの付かない事になるため、予め四隅の取り付け穴にスペーサ等を取り付けて端子等がショートしないようにしておくと良いだろう。

Page Top

DYNAMIXEL Wizard 2.0のインストール anchor.png

wizard2_main_001.png

DYNAMIXEL Wizard 2.0はDYNAMIXELとコミュニケーションを行うソフトウェアで、主に以下の機能を有する。

  • 検索
    設定された条件が不明確なDYNAMIXELを検索し発見。
  • ファームウェアの更新
    インターネットを介してDYNAMIXELの最新ファームウェアをダウンロードし更新したり、異なるファームウェアへ書き換える事が可能。
  • 簡易運転
    コントロールテーブルのうち主要なアイテムへ簡便にアクセスして運転可能。
    主に動作モード・目標位置・目標速度・トルクの変更をスイッチやダイアル操作で行える。
  • 設定変更
    コントロールテーブルの情報全てを一覧でき、また変更可能なアイテムの編集が可能。
  • 複数台の一元管理
    異なるID番号を持つ複数のDYNAMIXELの一覧を列挙し個別に操作可能。

DYNAMIXELの専用通信プロトコルを用いて対象のDYNAMIXELのコントロールテーブルにアクセスしているに過ぎないが、このようなツールを使用しない場合はちょっとした設定変更をするにしても自前でプログラムを作らざるを得ない。ROBOTIS社のサイトからダウンロードしてインストールしておく。

なおDYNAMIXEL Wizard 2.0はインターネットを介してツール本体とファームウェアの更新版を取得する事ができる。ここでは詳説しないが、インターネットに接続された環境でDYNAMIXEL Wizard 2.0を起動した直後に自動的に更新がないかを確認し、必要があれば更新するか否かを問い合わせてくる。Install Nowボタンを押すと更新が開始されるが、特に適用する必要が無いと判断したらInstall laterボタンを押す事で更新はキャンセルされる。

dxlwiz_update.png
Page Top

DYNAMIXEL Wizard 2.0の検索条件の設定 anchor.png

インストールしたDYNAMIXEL Wizard 2.0を起動したらツールバーのOptionsボタンをクリックし、開いたOptionsダイアログボックスの左側のScanをクリックする。

DXLWiz2_Options.png

以下の項目を設定してOKボタンを押すと、DYNAMIXEL Wizard 2.0の検索条件の期設定はひとまず完了である。次回起動した際も本設定が反映される。

  • Select protocol to scan.
    「Protocol 2.0」のみにチェック。
  • Select port to scan.
    DXHUBをPCに接続した際に追加されたCOMポートのみにチェック。
  • Select baudrate to scan.
    「57600 bps」のみにチェック。
  • Set ID range to scan.
    Startには0、Endには252を設定。

なお本設定は出荷時のDYNAMIXEL Xシリーズを対象としているため、後々のDYNAMIXELの設定等によっては以下のように適宜変更する。

  • Select protocol to scan.
    DYNAMIXEL DX/AX/RX/EX/MXシリーズやプロトコル1.0に設定したXシリーズを対象とする場合には「Protocol 1.0」をチェック。
  • Select baudrate to scan.
    57600bps以外の通信速度に変更しているならばその通信速度をチェック、もし設定された通信速度が不明な場合は全ての通信速度をチェック。
Page Top

DYNAMIXEL Wizard 2.0による検索 anchor.png

DXHUBの電源スイッチをONにして電源の供給を開始すると青色のLEDが点灯し、DYNAMIXELに装備されたLEDが0.5秒だけ点灯する。その後DYNAMIXEL Wizard 2.0のツールバーのScanボタンをクリックすると、検索条件の設定に従って検索が開始される。検索中のダイアログボックスにはプログレスバーが表示され、見つかったDYNAMIXELはダイアログボックスとメイン画面の画面左側のCOMポート番号と通信速度のツリーの配下に逐次列挙される。

DXLWiz2_Scan.png

Select baudrate to scan.の全ての通信速度がチェックされていると検索の終了までにかなりの時間がかかるが、検索中に目的のDYNAMIXELが見つかったり途中で中断する場合はSkipボタンを押す。

DYNAMIXEL Wizard 2.0は検索によって見つかったDYNAMIXELのみを操作対象とするため、検索は起動直後に毎回強いられる。検索条件の設定のSelect baudrate to scan.のチェックを減らしたり、Set ID range to scan.のStat及びEndのID番号を適宜変更する事で検索時間を短縮できる。

もし検索で見つからない場合は、検索条件の設定が対象のDYNAMIXELに合致しない・COMポートが間違っている・電源が入っていない・いずれかの装置の故障等の理由が考えられるが、原因を特定するのに少々難儀する場合もある。

Page Top

DYNAMIXEL Wizard 2.0による試運転 anchor.png

出荷時状態のDYNAMIXELの動作モードは大抵「位置決め制御」モードとなっており、任意の位置を指示する事でDYNAMIXELのサーボホーンの位置がそれに伴って移動する。DYNAMIXEL Wizard 2.0では以下の手順と操作で位置を指示できる。

  1. 検索
    対象のDYNAMIXELが見つかっていなければ検索する。
  2. 運転対象のDYNAMIXELを選択
    画面左側のCOMポート番号と通信速度のツリーの配下から任意のDYNAMIXELをクリックして選択すると(既に選択状態の場合もある)、DYNAMIXEL Wizardは対象のDYNAMIXELとの通信を開始する。正常であればステータスバー上のLEDやDXHUBのLEDが周期的に明滅しているはずである。
    DXLWiz2_SelDX.png
  3. TorqueをON
    画面右上のTorqueとあるスイッチをクリックしてON状態(赤色)にする。
    DXLWiz2_DXTorqueOn.png
  4. 位置を指示
    画面右上の円をクリックしたりドラッグするとDYNAMIXELへ位置(赤色の針の位置)が指示され、同時にDYNAMIXELのホーンの位置が移動する筈である。緑色の針は現在のホーンの位置を表している。
    DXLWiz2_DXGPos.png

この手順で思ったようにDYNAMIXELのホーンが動かない場合は、DYNAMIXELが故障している可能性が高い。

Page Top

DYNAMIXEL Wizard 2.0によるID番号の変更 anchor.png

複数のDYNAMIXELを同時に運用する場合は、個々に異なるID番号を設定しておかなくてはならない。通信プロトコルの仕様では複数のDYNAMIXELのID番号が同じだと個々を識別できないからである。

  1. 検索
    対象のDYNAMIXELが見つかっていなければ検索する。
  2. 変更対象のDYNAMIXELを選択
    画面左側のCOMポート番号と通信速度のツリーの配下から任意のDYNAMIXELをクリックして選択すると(既に選択状態の場合もある)、DYNAMIXEL Wizardは対象のDYNAMIXELとの通信を開始する。正常であればステータスバー上のLEDやDXHUBのLEDが周期的に明滅しているはずである。
    DXLWiz2_SelDX.png
  3. TorqueをOFF
    画面右上のTorqueとあるスイッチがON(赤色)ならばクリックしてOFF状態(灰色)にする。
    DXLWiz2_DXTorqueOff.png
  4. 画面中央のコントロールテーブルから「ID」を選択
    アドレスの7に「ID」が割り当てられているので、該当アドレスをクリックして選択状態にする。
    DXLWiz2_SelCtrlTableID.png
  5. ID番号を変更 画面右下の「ID」の一覧から任意のID番号を選択しSaveボタンを押す事で即時反映される(ここでは例として「ID 1」から「ID 3」へ変更)。
    DXLWiz2_ChangeID.png
    なお複数のDYNAMIXELが検索されている時は、既に見つかっているID番号は選択できない。
Page Top

DYNAMIXEL Wizard 2.0による通信速度の変更 anchor.png

DYNAMIXELのデフォルトの通信速度は57600bpsだが、軸数が増えたりタイトな通信をし始めるとトラフィックが渋滞し、体感的にも応答が遅いと感じたりする。高い通信速度は様々な環境に影響されやすいため最高速まで設定する必要は無いが、1000000bps程度であれば経験的に安定して使用できる速度と考える。以下に通信速度を変更する手順を示す。

  1. 検索
    対象のDYNAMIXELが見つかっていなければ検索する。
  2. 変更対象のDYNAMIXELを選択
    画面左側のCOMポート番号と通信速度のツリーの配下から任意のDYNAMIXELをクリックして選択すると(既に選択状態の場合もある)、DYNAMIXEL Wizardは対象のDYNAMIXELとの通信を開始する。正常であればステータスバー上のLEDやDXHUBのLEDが周期的に明滅しているはずである。
    DXLWiz2_SelDX.png
  3. TorqueをOFF
    画面右上のTorqueとあるスイッチがON(赤色)ならばクリックしてOFF状態(灰色)にする。
    DXLWiz2_DXTorqueOff.png
  4. 画面中央コントロールテーブルから「Baud Rate (Bus)」を選択
    アドレスの8に「Baud Rate (Bus)」が割り当てられているので、該当アドレスをクリックして選択状態にする。
    DXLWiz2_SelCtrlTableBaudrate.png
  5. 通信速度を変更
    コントロールテーブル上では通信速度そのものの値ではなく、代わりに0~7の値で指示する。
    Value通信速度[bps]
    09,600
    157,600
    2115,200
    31,000,000
    42,000,000
    53,000,000
    64,000,000
    74,500,000
    1000000bpsに変更するには画面右下のリストから「3 1Mbps」を選択しSaveボタンを押す事で即時反映される。
    DXLWiz2_ChangeBaud.png
Page Top

自作のプログラムから動かす anchor.png

プログラミングの知識が無ければ何も始まらないし、それ相応の環境を整える必要もある。
ここでは弊社が提供するWindows向けの簡易的な統合環境と専用のライブラリを用い、C言語でコーディングする。コンパイル後に生成されるプログラムは、いずれもCUIベースである。

Page Top

Dynamixel Protocol 2 Library anchor.png

DYNAMIXELは専用の通信プロトコルでのみ通信できるが、通信プロトコルを処理するプログラムを自前でコーディングするのにはそれ相応の知識と労力が要求される。その手間を軽減させる目的で提供しているのがDynamixel Protocol 2 Library(DX2LIB)である。
他にもこういった目的のライブラリはあるが、DYNAMIXELの主要な機能を利用する際にモデルやコントロールテーブルの差異をほぼ意識しなくて済む追加APIをDX2LIBは持っている。追加APIを使用している限りユーザーはCOMポート番号・通信速度・ID番号程度を把握していれば良く、最終的には少ないソースコード量で大抵の処理が行えると謳っている。また各APIはコンパイル済みのDLLに含まれているので、自分のプログラムから呼び出す手段さえあれば言語は問わない。
見ての通り追加APIのソースでは大した事はしておらず、モデルによって異なるアイテムの変換や手順を強いられるアイテムの手順操作、物理値とアイテム間の単位変換、複数IDの一括処理に終始しているだけである。しかし追加APIを使わない場合は、コントロールテーブルのアイテムの機能を習熟する段取りから始めざるを得ないので、説明を端折るにはありがたい :-P

なおプロトコルV1.0用のライブラリも用意されているので、どうしてもDX/AX/RX/EX/MXシリーズを動かしたいという場合は所々読み替えて対応してもらいたい。
また我々としてはプロトコルV1.0とV2.0を混在させた運用は危険と判断しているのでライブラリはプロトコルのバージョン別に用意しているが、DYNAMIXEL SDKでは同時に運用できる事になっている。

Page Top
ダウンロードと必要なファイル anchor.png

こちらより最新のアーカイブファイルをダウンロードし、適宜解凍する。

以後に紹介するGCC Developer Liteでコーディング・コンパイル・実行には、ライブラリに含まれる次の4つのファイルがソースコードファイルを保存したフォルダと同じ場所に配置されている事が前提となる。

ファイル名説明
dx2lib.h各APIのプロトタイプ宣言等を含むヘッダファイル
dx2memmap.hコントロールテーブル上の主要アイテムのアドレスが定義されたヘッダファイル
dx2lib_x32.dllライブラリ本体が含まれた32ビット版DLL
dx2lib_x64.dllライブラリ本体が含まれた64ビット版DLL

このうち「dx2memmap.h」は一部のDYNAMIXELのコントロールテーブルに配置されたアイテムのアドレスを記述しているファイルなのだが、アイテムのアドレスを指定してアクセスしない限り必須ではない。また32ビット版と64ビット版のDLLはコンパイル時の条件によっていずれか一方で構わない。その他のファイルは一切必要ない。

Page Top

GCC Developer Lite anchor.png

gdl.png

GCC Developer Liteはマイコンボード製品向けにプログラムを手っ取り早く動かせるツールとして提供しているが、ここではWindows上で動作するプログラムをコーディング及びコンパイルする事が目的となる。
最近の統合環境の類いに比べたら極めて少ない機能しか備えておらず正直なところ今更な感じではあるのだが、無償かつ簡単な設定でソースコードの編集とコンパイルができるという理由だけで選定した。

Page Top
インストーラのダウンロードとインストール anchor.png

ここではWindows向けの実行プログラムを生成する事が目的なので、ここから「基本パック」と「WIN64パック」をダウンロードする。手順を間違えるとインストールできないので、こちらを読んでから作業する。

Page Top
ソースコード編集・ファイル操作 anchor.png

後述のソースコードを使用する際に必要な操作を紹介する。

まずブラウザから該当のソースコードをクリップボードにコピーし、GCC Developer Liteのエディタウィンドウ上にペーストする。ソースコード中のCOMポート番号の部分("\\\\.\\COM3")の数値を自身の環境に合わせて書き換えたら編集作業は完了である。
編集中のソースコードをファイルとして保存しておけば、後々再度読み出して再利用できる。保存するには「ファイル(F)」メニュー内の「名前を付けて保存(A)...」をクリックし任意の場所と名前で保存する。なおファイル名のフルパスに全角文字・機種依存文字・空白が含まれているとコンパイルに支障をきたすので、基本的には半角の英数字で構成しておく事。

Page Top
コンパイルオプション anchor.png

Windowsの64bit版を想定したのでWIN64パックをインストールする事としたが、32bitの場合はWIN32パックを選定するまでである。もし32bit版を適用した場合は64bitとある所を32bit、x64とある所をx86に読み替える事。
コンパイルしたいソースコードを開いている状態で、「ツール(T)」メニュー内の「コンパイルオプション(O)」をクリックしコンパイルオプションダイアログボックを開く。上端の設定リストをドロップダウンし「Windows x64 (Console)」を選んでOKボタンを押すと、32ビット版のコンパイル条件の設定は全て完了する。

GDL_SelCompileOption.png

なお後述のビルドデバッグ情報付きビルドを行った既存のソースコードファイルを開き直した場合、「保存済みの環境設定ファイルが見つかりました。」というメッセージダイアログボックスが表示される。

GDL_ReloadCfg.png

このメッセージは過去にコンパイルオプションを設定してコンパイルされた事を意味し、OKボタンを押す事でコンパイラオプションを設定したのと同等の扱いとなる。

Page Top
ビルドと実行 anchor.png

後述のライブラリを利用したソースコードをコンパイルするには、ライブラリに含まれる4つのファイルが必要となる。それらファイルを予めソースコードを保存したフォルダにコピーしておく事を忘れないように。
コンパイラオプションが正常に設定されていれば、「コンパイル(C)」メニュー内の「ビルド(B)」が利用できるようになる。「ビルド(B)」をクリックする(もしくは「F9」キーを押す)とコンパイラが呼び出され、問題が無ければソースコードから実行可能なプログラムが同じフォルダの中に生成される。

GDL_Build.png

例えばソースコードのファイル名が「test1.c」ならば、「test1.exe」というファイル名で実行ファイルが生成される。実行ファイルが生成されたら、そのファイルをエクスプローラー等から開いて実行する。

GDL_AppRun.png

実行中はコマンドプロンプトが表示され、標準出力へ何かしらのメッセージが出力される。

CMD_Execute.png

なおエクスプローラーから起動したプログラムは、メッセージを確認する間もなく終了と同時にコマンドプロンプトが閉じてしまう。メッセージを確認する必要がある場合は予めコマンドプロンプト(cmd.exe)を起動しておき、その中でコンパイルしたプログラムを実行すれば終了時に勝手に閉じることはない。

Page Top
デバッグ情報付きビルドとデバッガ anchor.png

ビルドと実行と大差は無いが、コンパイル後にデバッガを介してプログラムを実行する。ここではデバッガを使ってデバッグする事が目的というよりも、GCC Developer Liteのみでコンパイルからプログラムの実行まで全て行うためだけに使う。

コンパイラオプションが正常に設定されていれば、「コンパイル(C)」メニュー内の「デバッグビルド&デバッガで開く(G)」が利用できるようになる。「デバッグビルド&デバッガで開く(G)」をクリックする(もしくは「Ctrl」+「F9」キーを押す)とコンパイラが呼び出され、問題が無ければソースコードから実行可能なプログラムが同じフォルダの中に生成される。

GDL_DebugBuild.png

コンパイルが成功すると「デバッガ(GDB)を起動しますか?」と問い合わせてくるので、OKを押すとデバッガ(GDB)が開く。

GDL_GDBRun.png

GDBの操作説明は省くが、そのプロンプト上で「r」と入力してエンターキーを押すとコンパイルしたプログラムが実行される。

GDB_Execute.png

なおデバッガに興味を持ったのであれば、その使い方を検索してみると良い。

Page Top
その他の機能 anchor.png

ソースコードの編集以外には大した機能は無いが、その中でも余り知られていない機能を紹介する。

  • 外部ファイル参照
    ソースコードのヘッダ等のファイル名が記述されたところにマウスカーソルを置き「Ctrl」キーを押しながらダブルクリックすると、該当のファイルを検索して別ウィンドウでそのファイルを開く事ができる。
    GDL_NewWindow.png
    ヘッダファイルの定義を参照する際に使える。
  • ブラウザ機能
    外部ファイル参照と似ているが、ソースコード中にコメント等でURLを記述しておき、そこにマウスカーソルを置き「Ctrl」キーを押しながらダブルクリックすると、別ウィンドウでそのサイトを開く事ができる。
    GDL_NewIEWindow.png
    APIのドキュメントはホームページにのみ公開しているので、そのURLをソース中のコメントに記述しておいて、適宜参照する際に利用してはどうだろうか。
  • タグジャンプ
    「検索(S)」メニュー内の「タグファイル自動生成(C)」と「タグ生成オプション」内の「タグ生成時にカレントフォルダを含める」をチェックしておく。この状態でソースコードをファイルに保存するとバックグラウンドでタグファイルが生成し直される(特に見かけ上変化はないが処理が長時間に及ぶ事もある)。
    準備ができたらソースコードの関数名が書かれた部分にカーソルを置き、「Ctrl」キーを押しながら「]」キーを押してから「]」キーのみを放す(「Ctrl」キーのみが押された状態)とエディタウィンドウ中央にその関数名の宣言がなされているファイルの一覧が表示される。
    GDL_TagJump.png
    その関数名の参照先が見つからなかった場合は、ファイルの一覧は表示されない。「Ctrl」キーが押されままの状態でファイルの一覧が表示されている間にカーソルキーの上下やマウスのクリックでファイル一覧から任意のファイル名を選択すると、そのファイルが別ウィンドウが開いてかつ該当するタグが記述されている場所にジャンプする。もし編集中のソースコードが参照先の場合は、別ウィンドウは開かずにそのまま該当箇所にジャンプする。
    なおタグとソースコードとの相関はほぼ皆無なので、あくまで参考程度に。
  • エンコード
    ソースコードファイルはテキストファイルなのだが、コンパイラやツール類はその文字コードに多分に影響を受ける。エディタ上では問題無く閲覧できているにもかかわらず、含まれる文字によっては不可解なエラーが発生するといった事が考えられる。
    GCC Developer LiteのデフォルトエンコードであるShift_JISでは問題が生じやすいため、できる限りUTF-8もしくはBOM付きUTF-8に変更た上でソースコードファイルを保存する事を推奨する。画面最下段のステータスバーの右側に現在のエンコードと改行の状態が表示されており、右クリックして表示されるポップアップメニューより任意のエンコードと改行に変更できる。
    GDL_Encode.png
  • 文字サイズ
    GCC Developer Liteは最近の高DPIなPCに対応していないため、高解像度になればなるほどエディタ画面の文字サイズが小さく表示され、場合によっては普通の視力では識別できない程小さく表示される。その場合は「Ctrl」キーを押しながらマウスホイールを回転させるかピンチイン・アウトさせると、それと連動して文字サイズが変わる。ソースコード全体の見通しを良くするために縮小したり、小さすぎて見えないから拡大するといった使い方ができる。
    GDL_Zoom.png
Page Top

トライアウト anchor.png

紹介するソースコード上ではID番号が1~4に設定されたDYNAMIXELがあり、信速度は1Mbpsに変更してあるものとした。自身の環境向けにはPC上のDXHUBに割り当てられたCOMポートの番号を修正する程度で済む筈だ。ID番号が4つよりも少なくても多くても動かす事はできるが、少なくともID番号が1と2に設定されたDYNAMIXELが無いと一部の機能の確認ができない。
またDYNAMIXELのコントロールテーブルには機能を大きく左右するOperating Modeなるアイテムがあり、出荷時は3(Position Control Mode)に設定されている。紹介するソースコードによってはそのモードを変更しており、DYNAMIXELの電源を投入し直してもモードはそのまま保持される。他の目的でDYNAMIXELを使用する際はOperating Modeが変更されている可能性がある事を常に失念しないよう注意する事。
さらに初期条件を整える時を除いてエラー処理の類いを行っておらず、何かしらの不都合があったとしてもプログラムの実行は停止しないまでか異常終了する事も考えられる。DX2LIBのAPIの大半は戻り値で実行の正否の判定ができるので、堅牢なアプリケションを構成する際は判断材料にすると良いだろう。

少なくともソースコードをコンパイルするに当たっての前提は、

である。

なおGCC Developer LiteはGNUツールを用いてコンパイルしているため、MSYS2の環境でもコンパイルできる。またCOMポート名や時間にかかる関数を修正すれば、他のOSでも容易に流用できる。

Page Top
DYNAMIXELの検索とモデル名の列挙 anchor.png

以後のソースコードには、APIを使用するのに共通する最低限必要な処理が含まれている。

  1. LoadDLL
    「dx2lib.h」をインクルードする前に「_DYNAMICLOAD」を宣言しているので、LoadDLLを実行してAPIを提供するDLLを動的に読み込む。実行プログラムと同じフォルダにDLLが無い場合は実行を終了させる。
  2. DX2_OpenPort
    COMポート名と通信速度を指定し、オープンに成功した際の戻り値を以後APIの引数に用いる。ソースコードでは"\\\\.\\COM3"といった具合にポート名に「\\\\.\\」といったプレフィクスを付与しているが、常に記述しておく事を推奨する。
  3. DXL_ScanDevices
    全ID番号を対象としたDYNAMIXELの検索と見つかったID番号のテーブルの生成を行う。戻り値は見つかったDYNAMIXELの数で、引数に指定した配列にはそのID番号が保存される。以後検索で見つからなかったID番号を指定してAPIを呼び出しても、実際には処理は行われずに無視される。
  4. DX2_ClosePort
    DX2_OpenPortでオープンしたCOMポートのクローズする。以後は引数で指定したデバイスIDが使用できなくなる。
  5. UnloadDLL
    DLLの破棄を行う。この処理以後は全APIが使えなくなり、万が一呼び出してしまうと一般保護エラーが発生する。

最終的には検索とCOMポートのクローズの間に実際に行いたい処理を追記する事になる。
DXL_ScanDevicesによる検索はDYNAMIXELのモデルが想定と違っていてもある程度順応できるようにと考えての事だが、諸設定が最適に設定されていたとしても検索が終わるまではそれ相応の時間(約10秒程度)かかる。

// DYNAMIXELの検索とモデル名の列挙
#include <stdio.h>
#define _DYNAMICLOAD  // DX2LIBの動的呼び出しを宣言
#include "dx2lib.h"   // DX2LIBのヘッダ

int main (void) {
  // DLLの読み込み(成功は必須)
  if (LoadDLL ()) {
    uint8_t IDs[253];   // 見つかったIDのリスト
    int detectnum = 0;  // 見つかった数

    // COMポートのオープン
    TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000);

    // devが0でなければオープン成功
    if (dev) {
      // 検索
      detectnum = DXL_ScanDevices (dev, IDs);

      // 見つかったモデルをコンソールに列挙
      DXL_PrintDevicesList ((void *)&printf);

      // この辺りに目的の処理を追記

      // COMポートのクローズ
      DX2_ClosePort (dev);
    }
    // DLLの破棄
    UnloadDLL ();
  }
}

正常に検索が終了すると左からID番号・モデル名・モデル番号の順に次のような情報が表示される。

[  1] 2XL430-W250    ($0442)
[  2] 2XL430-W250    ($0442)
[  3] 2XL430-W250    ($0442)
[  4] 2XL430-W250    ($0442)

なおID番号が1~4のDYNAMIXELが存在する前提でのDXL_GetModelInfo APIを使った登録方法も一応紹介しておく。こちらは指定されたID番号のみを対象に登録できるかを判断するので、全ID番号を対象とした検索を行った場合と異なりほぼ一瞬で処理が終了する。

// ID番号を決め打ちする場合
#include <stdio.h>
#define _DYNAMICLOAD
#include "dx2lib.h"

int main (void) {
  // DLLの読み込み(成功は必須)
  if (LoadDLL ()) {
    // COMポートのオープン
    TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000);

    // devが0でなければオープン成功
    if (dev) {
      // 検索の代わりにID番号を直接指定して登録
      DXL_GetModelInfo (dev, 1);
      DXL_GetModelInfo (dev, 2);
      DXL_GetModelInfo (dev, 3);
      DXL_GetModelInfo (dev, 4);

      // 登録されたモデルをコンソールに列挙
      DXL_PrintDevicesList ((void *)&printf);

      // この辺りに目的の処理を追記

      // COMポートのクローズ
      DX2_ClosePort (dev);
    }
    // DLLの破棄
    UnloadDLL ();
  }
}
Page Top
LEDのON/OFF anchor.png

サイレントに動作確認を行うには、DYNAMIXELに搭載されているLEDを使うと良いだろう。

ID番号を指定してLEDを明滅させるDXL_SetLED APIを用いて、個々のID番号を順に指定してLEDをON/OFFさせる。なおAPIの処理そのものは一瞬で終わってしまうので、目に見えるように10回のループ内のLEDのONとOFFの後にSleep命令で500msの中断を挿入した。

// LEDのON/OFF
#include <stdio.h>
#define _DYNAMICLOAD
#include "dx2lib.h"

int main (void) {
  if (LoadDLL ()) {
    uint8_t IDs[253];   // 見つかったIDのリスト
    int detectnum = 0;  // 見つかった数

    TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000);
    if (dev) {
      detectnum = DXL_ScanDevices (dev, IDs);
      DXL_PrintDevicesList ((void *)&printf);

      // とりあえず10回繰り返す
      for (int i = 0; i < 10; i++) {
        // 全軸のLEDをONに設定
        printf ("ON ");
        for (int j = 0; j < detectnum; j++) DXL_SetLED (dev, IDs[j], true);
        Sleep (500);
        // 全軸のLEDをOFFに設定
        printf ("OFF ");
        for (int j = 0; j < detectnum; j++) DXL_SetLED (dev, IDs[j], false);
        Sleep (500);
      }

      DX2_ClosePort (dev);
    }
    UnloadDLL ();
  }
}

なおDYNAMIXELに何らかのアラームが発生した事によりLEDが自動点滅している間は、LEDの制御ができない。

Page Top
トルクのON/OFF anchor.png

試運転の際にも行っている通りトルクのONという操作を予め行っておかないと、位置や速度を指令してもホーンは動かない。コントロールテーブル上のアイテム名が「Torque Enable」となっているためにトルクと称しているが、実際には設定された制御モードに従って制御を開始・停止する事を意味している。

DXL_SetTorqueEnablesEquival APIを用いて引数で指定されたID番号の一覧を用いて一斉にトルクのON/OFFを切り替える。

// トルクのON/OFF
#include <stdio.h>
#define _DYNAMICLOAD
#include "dx2lib.h"

int main (void) {
  if (LoadDLL ()) {
    uint8_t IDs[253];   // 見つかったIDのリスト
    int detectnum = 0;  // 見つかった数

    TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000);
    if (dev) {
      detectnum = DXL_ScanDevices (dev, IDs);
      DXL_PrintDevicesList ((void *)&printf);

      // とりあえず意味も無く5回繰り返す
      for (int i = 0; i < 5; i++) {
        // 全軸のトルクをONに設定
        printf ("ON ");
        DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, true);
        Sleep (1000);
        // 全軸のトルクをOFFに設定
        printf ("OFF ");
        DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, false);
        Sleep (1000);
      }

      DX2_ClosePort (dev);
    }
    UnloadDLL ();
  }
}

トルクがOFFの時にDYNAMIXELは脱力し、外力でホーンを動かす事ができる。
またコントロールテーブル中のアイテムの一部にはトルクがONの状態では変更できないものもあるため、APIによっては必要に応じてトルクのOFFを発行しているものがある。

Page Top
角度指令 anchor.png

DYNAMIXELの本分である位置決め制御を行う。

DXL_SetOperatingModesEquival APIを使用して全DYNAMIXELの動作モードを3:Position Control Modeに設定(このAPIは内部でトルクOFFを発行する)。全軸のトルクをON。コメントにあるように1秒かけて全軸の角度を45度へ移動→2秒かけて全軸の角度を-45度へ移動→1秒かけて全軸の角度を0度へ移動。最後に全軸のトルクをOFFにして終了する。foo関数内のDXL_SetGoalAnglesAndTime APIは軸数分の角度の配列と現在の角度からの移動時間を秒で指定するAPIだが、DYNAMIXELへの指令を終えると移動時間を待たずに即時復帰する。そのため直後にSleep関数で移動している間だけプログラムの進捗を意図的に停止させている。

// 角度指令
#include <stdio.h>
#define _DYNAMICLOAD
#include "dx2lib.h"

// 角度指令と時間待ち関数
bool foo (TDeviceID dvid, const uint8_t *ids, const double *angles, int num, double sec) {
  bool result = DXL_SetGoalAnglesAndTime (dvid, ids, angles, num, sec);
  Sleep (sec * 1000); // 移動完了まで待機
  return result;
}

int main (void) {
  if (LoadDLL ()) {
    uint8_t IDs[253];   // 見つかったIDのリスト
    int detectnum = 0;  // 見つかった数
    double angles[253]; // 角度の配列

    TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000);
    if (dev) {
      detectnum = DXL_ScanDevices (dev, IDs);
      DXL_PrintDevicesList ((void *)&printf);

      // 全軸を 3: Position Control に設定
      DXL_SetOperatingModesEquival (dev, IDs, detectnum, 3);
      // 全軸のトルクをONに設定
      DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, true);

      // 1秒かけて全軸の角度を45度へ移動
      printf ("45deg ");
      for (int j = 0; j < detectnum; j++) angles[j] = 45.0;
      foo (dev, IDs, angles, detectnum, 1.0);

      // 2秒かけて全軸の角度を-45度へ移動
      printf ("-45deg ");
      for (int j = 0; j < detectnum; j++) angles[j] = -45.0;
      foo (dev, IDs, angles, detectnum, 2.0);

      // 1秒かけて全軸の角度を0度へ移動
      printf ("0deg ");
      for (int j = 0; j < detectnum; j++) angles[j] = 0.0;
      foo (dev, IDs, angles, detectnum, 1.0);

      // 全軸のトルクをOFFに設定
      DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, false);

      DX2_ClosePort (dev);
    }
    UnloadDLL ();
  }
}

角度や時間を変えて実際の挙動がどのようになるか試して欲しい。

Page Top
角度取得 anchor.png

こちらもDYNAMIXELの本分であるフィードバック値の取得を行う。

全軸のDYNAMIXELのトルクをOFFにして外力でホーンを動かせる状態にした後、DXL_GetPresentAngles APIを使用して複数軸の現在角度を取得し、ID番号と合わせて取得した角度を全軸分表示する。角度の取得と表示を何かしらのキー入力を行うまでループし続ける。

// 角度取得
#include <stdio.h>
#include <conio.h>
#define _DYNAMICLOAD
#include "dx2lib.h"

int main (void) {
  if (LoadDLL ()) {
    uint8_t IDs[253];   // 見つかったIDのリスト
    int detectnum = 0;  // 見つかった数
    double angles[253]; // 角度の配列

    TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000);
    if (dev) {
      detectnum = DXL_ScanDevices (dev, IDs);
      DXL_PrintDevicesList ((void *)&printf);

      // 全軸を 3: Position Control に設定
      DXL_SetOperatingModesEquival (dev, IDs, detectnum, 3);
      DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, false);

      // キー入力があるまでループ
      while (!_kbhit ()) {
        // 全軸の現在角度を取得
        DXL_GetPresentAngles (dev,IDs, angles, detectnum);
        // 取得した角度を表示
        printf ("\r");
        for (int j = 0; j < detectnum; j++) printf("[%d]%6.1f ", IDs[j], angles[j]);
      }

      DX2_ClosePort (dev);
    }
    UnloadDLL ();
  }
}
Page Top
角度取得と角度指令 anchor.png

現在のDYNAMIXELのホーンの位置を取得して他のDYNAMIXELへの角度指令値として利用する。

ID番号が1のDYNAMIXELのトルクをOFFにして外力でホーンを動かせる状態にし、DXL_GetPresentAngle APIで現在のホーンの角度を取得する。その後DXL_SetGoalAngles APIを用いてその角度を全軸のDYNAMIXELへ指令する。ID番号1に対しても角度を指令しているが、予めトルクをOFFにしているのでホーンは動かない。角度の取得と位置の指令を何かしらのキー入力を行うまでループし続ける。

// 角度取得と角度指令
#include <stdio.h>
#include <conio.h>
#define _DYNAMICLOAD
#include "dx2lib.h"

int main (void) {
  if (LoadDLL ()) {
    uint8_t IDs[253];   // 見つかったIDのリスト
    int detectnum = 0;  // 見つかった数
    double angle = 0;   // 現在の角度
    double angles[253]; // 角度の配列

    TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000);
    if (dev) {
      detectnum = DXL_ScanDevices (dev, IDs);
      DXL_PrintDevicesList ((void *)&printf);

      // 全軸を 3: Position Control に設定
      DXL_SetOperatingModesEquival (dev, IDs, detectnum, 3);
      DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, true);

      // ID=1のみトルクをOFFに設定
      DXL_SetTorqueEnable (dev, 1, false);

      // キー入力があるまでループ
      while (!_kbhit ()) {
        // ID=1の現在角度を取得
        DXL_GetPresentAngle (dev, 1, &angle);
        printf("\rPresent Angle = %4.1f  ", angle);
        // 全軸に同じ目標角度を指令
        for (int j = 0; j < detectnum; j++) angles[j] = angle;
        DXL_SetGoalAngles (dev, IDs, angles, detectnum);
      }

      DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, false);
      DX2_ClosePort (dev);
    }
    UnloadDLL ();
  }
}

なお前述のDXL_SetGoalAnglesAndTime APIを使ったプログラムを実行すると、移動時間の情報がDYNAMIXELに残ったままになる。その直後に本プログラムを実行した場合、ID番号1のホーンの動きに他のホーンが即応しないように見える。その場合はDYNAMIXELへ供給していた電源を一旦OFFにした後に再度ONにしてからプログラムを実行する事。

Page Top
角速度指令 anchor.png

最高速度はあまり速くはないが、DYNAMIXELにホイールを装着して台車の足回りに使う事ができる。

DXL_SetOperatingModesEquival APIを使用して全DYNAMIXELの動作モードを1:Velocity Controlに設定(このAPIは内部でトルクOFFを発行する)。全軸のトルクをON。DXL_SetGoalVelocities APIを使用して全軸の角速度を30deg/sに設定→全軸の角速度を-30deg/sに設定→全軸の角速度を0deg/sに設定。最後に全軸のトルクをOFFにして終了する。

// 角速度指令
#include <stdio.h>
#include <conio.h>
#define _DYNAMICLOAD
#include "dx2lib.h"

int main (void) {
  if (LoadDLL ()) {
    uint8_t IDs[253];   // 見つかったIDのリスト
    int detectnum = 0;  // 見つかった数
    double velo[253];   // 角速度の配列

    TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000);
    if (dev) {
      detectnum = DXL_ScanDevices (dev, IDs);
      DXL_PrintDevicesList ((void *)&printf);

      // 全軸を 1: Velocity Control に設定
      DXL_SetOperatingModesEquival (dev, IDs, detectnum, 1);
      DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, true);

      // 全軸の角速度を30deg/sに
      printf ("30deg/s ");
      for (int j = 0; j < detectnum; j++) velo[j] = 30.0;
      DXL_SetGoalVelocities (dev, IDs, velo, detectnum);
      Sleep (5000); // 5秒待ち

      // 全軸の角速度を-30deg/sに
      printf ("-30deg/s ");
      for (int j = 0; j < detectnum; j++) velo[j] = -30.0;
      DXL_SetGoalVelocities (dev, IDs, velo, detectnum);
      Sleep (5000); // 5秒待ち

      // 全軸の角速度を0deg/sに
      printf ("0deg/s ");
      for (int j = 0; j < detectnum; j++) velo[j] = 0.0;
      DXL_SetGoalVelocities (dev, IDs, velo, detectnum);
      Sleep (1000); // 1秒待ち

      DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, false);
      DX2_ClosePort (dev);
    }
    UnloadDLL ();
  }
}
Page Top
モーション再生 anchor.png

角度指令の応用で、各軸に与える角度指令値を時間に応じて複数パターンで変遷させる。

ここでは各DYNAMIXELの目標角度とその角度へ移行させる時間をTMotion構造体とし、複数のTMotion構造体を連続して実行させる事で一連の動きを実現している。またID番号1~8があるものとしてDXL_GetModelInfo APIを使用して検索時間を無くした。
GetQueryPerformanceCounter APIはPCの起動からの時間をmsで取得できるので、モーションで指定された時間を待機する際に利用している。

// モーション再生
#include <stdio.h>
#include <conio.h>
#define _DYNAMICLOAD
#include "dx2lib.h"
#define AXISNUM   (8)   // 軸数
#define MSize(d)  (sizeof(d) / sizeof(d[0]))

// 存在の有無にかかわらず指定軸数分のID番号のテーブル
const uint8_t IDs[AXISNUM] = { 1,  2,  3,  4,  5,  6,  7,  8 };

// 各軸の角度と遷移時間の構造体
typedef struct {
  double  angles[AXISNUM];  // 軸数分の角度情報
  double  sec;              // 遷移時間
} TMotion;

// モーションデータ1
//  8軸を1秒かけて0度(センター)に移動
const TMotion motion1[] = {
  {{   0,   0,   0,   0,   0,   0,   0,   0}, 1.0},
};

// モーションデータ2
//  8軸の5つの様々な型を各秒数かけて移動
const TMotion motion2[] = {
  {{  90, 390,  90,  90,  90,  90,  90,  90}, 3.0},
  {{   0,   0,   0,   0,   0,   0,   0,   0}, 2.0},
  {{-290,-190, -90, -90, -90, -90, -90, -90}, 3.0},
  {{  45, 145,  45,  45,  45,  45,  45,  45}, 3.0},
  {{ -30,-230, -30, -30, -30, -30, -30, -30}, 3.0},
};

// モーション再生
void PLAY (TDeviceID dev, const uint8_t *ids, const TMotion *motion, int framenum) {
  for (int i = 0; i < framenum; i++) {
    double t = GetQueryPerformanceCounter () + motion[i].sec * 1000;
    DXL_SetGoalAnglesAndTime (dev, ids, motion[i].angles, AXISNUM, motion[i].sec);
    while (t > GetQueryPerformanceCounter ()) Sleep (1);
  }
}

int main (void) {
  if (LoadDLL ()) {
    TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000);
    if (dev != 0) {
      for (int i = 0; i < AXISNUM; i++)
        printf ("id=%2d, ModelName=%s\n", IDs[i], DXL_GetModelInfo (dev, IDs[i])->name);

      // 全軸を 4: Expand Position Control に設定
      DXL_SetOperatingModesEquival (dev, IDs, AXISNUM, 4);
      DXL_SetTorqueEnablesEquival (dev, IDs, AXISNUM, true);

      // motion1を再生
      printf("motion1 ");
      PLAY (dev, IDs, motion1, MSize (motion1));

      // motion2を再生
      printf("motion2 ");
      PLAY (dev, IDs, motion2, MSize (motion2));

      DXL_SetTorqueEnablesEquival (dev, IDs, AXISNUM, false);
      DX2_ClosePort (dev);
    }
    UnloadDLL ();
  }
}
Page Top
並列処理 anchor.png

1つのパケット処理にかかる時間が短ければ単位時間当たりにこなせるパケット数が増えるため、自ずと通信速度が高い環境が望まれる。その恩恵に与かれるものとしてOSの並列処理が挙げられる。

ここでは角度と電流値を取得してモニタし続ける処理をthread1、角度を指令し続ける処理をthread2といった具合に個別のスレッドに割り当てた。下準備と終了処理はmain内で行っているが、それ以外は各々のスレッドが各々独立して処理している。
各々のスレッドに使用しているAPIが互いを意識しないコードになっているのは、DX2LIBのAPIが排他的に処理されるように作られているためである。しかし1つのスレッドでAPIを無限ループで遅滞なく呼び出し続けられてしまうとリソースを独占されてしまうため、ループ中にSleepを入れて適宜コンテキストスイッチを促している。

// マルチスレッド
#include <stdio.h>
#include <conio.h>
#define _DYNAMICLOAD
#include "dx2lib.h"
#include <pthread.h>
#include <math.h>

bool term = false;    // スレッド終了フラグ

// スレッドに渡すパラメータ
typedef struct {
  TDeviceID dev;      // デバイスID
  const uint8_t *ids; // IDの配列
  int num;            // ID数
} TThInfo;

// スレッド1 (モニタ)
void *thread1 (TThInfo *info) {
  double pangle[info->num], pcur[info->num];

  while (!term) {
    // 現在位置取得
    if (DXL_GetPresentAngles (info->dev, info->ids, pangle, info->num)) {
      // 現在電流取得
      if (DXL_GetPresentCurrents(info->dev, info->ids, pcur, info->num)) {
        printf("\r");
        for (int i = 0; i < info->num; i++)
          printf ("(%d:%5.1f %6.1f) ", info->ids[i], pangle[i], pcur[i]);
      }
    }
    Sleep (5);
  }
  return (void *)NULL;
}

// スレッド2 (正弦波で角度指令)
void *thread2 (TThInfo *info) {
  double ang, angle[info->num];

  while (!term) {
    // 時間から±60度の角度を生成 (4秒周期)
    ang = sin (0.5 * M_PI * GetQueryPerformanceCounter () * 0.001) * 60.0;
    for (int i = 0; i < info->num; i++) angle[i] = ang;
    DXL_SetGoalAngles (info->dev, info->ids, angle, info->num);
    Sleep (5);
  }
  return (void *)NULL;
}

int main (void) {
  pthread_t th1, th2;

  if (LoadDLL ()) {
    uint8_t IDs[253];
    int detectnum = 0;

    TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000);
    if (dev) {
      detectnum = DXL_ScanDevices (dev, IDs);
      DXL_PrintDevicesList ((void *)&printf);

      // 全軸を 3: Position Control に設定
      DXL_SetOperatingModesEquival (dev, IDs, detectnum, 3);
      DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, true);

      // 2つのスレッドを作成
      TThInfo info = { dev, IDs, detectnum };
      pthread_create(&th1, NULL, (void *)thread1, (void *)&info);
      pthread_create(&th2, NULL, (void *)thread2, (void *)&info);

      // キー入力があるまでループ
      while (!_kbhit ()) {
        Sleep (100);
      }

      // スレッド終了を励起
      term = true;

      // スレッド終了待ち
      pthread_join(th1, NULL);
      pthread_join(th2, NULL);

      DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, false);
      DX2_ClosePort (dev);
    }
    UnloadDLL ();
  }
}

同時に他のアプリケーションを使用すると、大抵の場合DYNAMIXELの挙動がおかしくなる。実際にはプログラムの実行が他のアプリケーションによって阻害され、 単純なループで高速に角度指令を行っている処理(こういった角度指令の仕方は旧来のDYNAMIXELの常套手段だった)が間延びしてしまう事が原因だ。マルチタスク環境において何がどのように影響するかを微調整する術はあるので、時間があるときにでも調べて欲しい。
またあえて諸々の通信速度を9600bpsに変更した場合も、最終的な動作にどの程度影響するかを試しても面白い。
その結果、Windows上でいわゆるリアルタイム性を担保するのは一筋縄ではいかない事が分かってもらえれば良いかと。

Page Top

他の言語 anchor.png

Page Top
Pythonその1 anchor.png

DX2LIBのページに紹介されている通りだが、その中でも流行のPythonで遊んでみても面白いかと思う。
GCC Developer LiteにはPythonが含まれており、コンパイルオプションで「Python(32bit)」や「Python(64bit)」を選ぶ事でライブラリに同梱されるPythonのサンプルソースコードが利用できる。しかし追加APIを利用したものばかりで、コントロールテーブルを読み書きする基本的なAPIは使われていない。

ここでは基本的なAPIのみを使った例を紹介するが、dx2lib.pyはC言語で作成されたライブラリをPythonで利用するためだけの記述がなされているため、Pythonならではの柔軟なコードがctypesによって変に厳格化されて使い勝手が悪い。
ひとまずDynamixel Xシリーズを前提とし、主要なアイテムのアドレスとコントロールテーブルを宣言してみた。

from ctypes import *

ADDRESS_X_MODEL_NUMBER          = 0   #W
ADDRESS_X_MODEL_INFORMATION     = 4   #L
ADDRESS_X_VERSION_FW            = 6   #B
ADDRESS_X_ID                    = 7   #B
ADDRESS_X_BAUDRATE              = 8   #B
ADDRESS_X_RETURN_DELAY_TIME     = 9   #B
ADDRESS_X_DRIVE_MODE            = 10  #B
ADDRESS_X_OPERATIING_MODE       = 11  #B
ADDRESS_X_HOMING_OFFSET         = 20  #L
ADDRESS_X_MAX_POSITION_LIMIT    = 48  #L
ADDRESS_X_MIN_POSITION_LIMIT    = 52  #L
ADDRESS_X_TORQUE_ENABLE         = 64  #B
ADDRESS_X_LED_RED               = 65  #B
ADDRESS_X_STATUS_RETURN_LEVEL   = 69  #B
ADDRESS_X_HARDWARE_ERROR_STATUS = 70  #B
ADDRESS_X_VELOCITY_IGAIN        = 76  #W
ADDRESS_X_VELOCITY_PGAIN        = 78  #W
ADDRESS_X_POSITION_DGAIN        = 80  #W
ADDRESS_X_POSITION_IGAIN        = 82  #W
ADDRESS_X_POSITION_PGAIN        = 84  #W
ADDRESS_X_GOAL_PWM              = 100 #W
ADDRESS_X_GOAL_CURRENT          = 102 #W
ADDRESS_X_GOAL_VELOCITY         = 104 #L
ADDRESS_X_PROF_ACCELERATION     = 108 #L
ADDRESS_X_PROF_VELOCITY         = 112 #L
ADDRESS_X_GOAL_POSITION         = 116 #L
ADDRESS_X_MOVING                = 122 #B
ADDRESS_X_MOVING_STATUS         = 123 #B
ADDRESS_X_PRESENT_PWM           = 124 #W
ADDRESS_X_PRESENT_CURRENT       = 126 #W
ADDRESS_X_PRESENT_VELOCITY      = 128 #L
ADDRESS_X_PRESENT_POSITION      = 132 #L
ADDRESS_X_VELOCITY_TRAJECTORY   = 136 #L
ADDRESS_X_POSITION_TRAJECTORY   = 140 #L
ADDRESS_X_PRESENT_VOLTAGE       = 144 #W
ADDRESS_X_PRESENT_TEMP          = 146 #B

class XCtrlTable(Structure):
  _pack_ = 1
  _fields_ = [
    ('ModelNumber',                 c_uint16),
    ('ModelInformation',            c_uint32),
    ('VersionofFirmware',           c_uint8),
    ('ID',                          c_uint8),
    ('Baudrate',                    c_uint8),
    ('ReturnDelayTime',             c_uint8),
    ('DriveMode',                   c_uint8),
    ('OperatingMode',               c_uint8),
    ('SecondaryID',                 c_uint8),
    ('ProtocolVersion',             c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('HomingOffset',                c_int32),
    ('MovingThreshold',             c_uint32),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('TemperatureLimit',            c_uint8),
    ('MaxVoltageLimit',             c_uint16),
    ('MinVoltageLimit',             c_uint16),
    ('PWMLimit',                    c_uint16),
    ('CurrentLimit',                c_uint16),
    ('AccelerationLimit',           c_uint32),
    ('VelocityLimit',               c_uint32),
    ('MaxPositionLimit',            c_uint32),
    ('MinPositionLimit',            c_uint32),
    ('ExternalPortMode1',           c_uint8),
    ('ExternalPortMode2',           c_uint8),
    ('ExternalPortMode3',           c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('Shutdown',                    c_uint8),
    ('TorqueEnable',                c_uint8),
    ('LED',                         c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('StatusReturnLevel',           c_uint8),
    ('RegisteredInstruction',       c_uint8),
    ('HardwareErrorStatus',         c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('VelocityIGain',               c_uint16),
    ('VelocityPGain',               c_uint16),
    ('PositionDGain',               c_uint16),
    ('PositionIGain',               c_uint16),
    ('PositionPGain',               c_uint16),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('FeedforwardAccelerationGain', c_uint16),
    ('FeedforwardVelocityGain, ',   c_uint16),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('BusWatchdog',                 c_int8),
    ('reserve',                     c_uint8),
    ('GoalPWM',                     c_int16),
    ('GoalCurrent',                 c_int16),
    ('GoalVelocity',                c_int32),
    ('ProfileAcceleration',         c_uint32),
    ('ProfileVelocity',             c_uint32),
    ('GoalPosition',                c_int32),
    ('RealtimeTick',                c_uint16),
    ('Moving',                      c_uint8),
    ('MovingStatus',                c_uint8),
    ('PresentPWM',                  c_int16),
    ('PresentCurrent',              c_int16),
    ('PresentVelocity',             c_int32),
    ('PresentPosition',             c_int32),
    ('VelocityTrajectory',          c_uint32),
    ('PositionTrajectory',          c_uint32),
    ('PresentInputVoltage',         c_uint16),
    ('PresentTemperature',          c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('reserve',                     c_uint8),
    ('ExternalPortData1',           c_uint16),
    ('ExternalPortData2',           c_uint16),
    ('ExternalPortData3',           c_uint16)]

class VelocityGain(Structure):
  _pack_ = 1
  _fields_ = [
    ('VelocityIGain',               c_uint16),
    ('VelocityPGain',               c_uint16)]

class PositionGain(Structure):
  _pack_ = 1
  _fields_ = [
    ('PositionDGain',               c_uint16),
    ('PositionIGain',               c_uint16),
    ('PositionPGain',               c_uint16)]

例えばPresent PositionをID=1のDynamixelのコントロールテーブルから読み出すには、以下のコードで対応できる。

from dx2lib import *

dev = DX2_OpenPort (b'\\\\.\\COM3', 57600)
if dev != None:
  ppos = c_int32()
  err = c_uint16()
  DX2_ReadLongData (dev, 1, ADDRESS_X_PRESENT_POSITION, cast(byref(ppos), POINTER(c_uint32)), err)
  DX2_ClosePort (dev)

更にXCtrlTable構造体を用いて変数を定義し、その変数へID=1のDynamixelのコントロールテーブルから一括読み出した値を保存する場合はDX2_ReadBlockDataを用いる。

from dx2lib import *

dev = DX2_OpenPort (b'\\\\.\\COM3', 57600)
if dev != None:
  tbl = XCtrlTable()
  err = c_uint16()
  if DX2_ReadBlockData (dev, 1, ADDRESS_X_MODEL_NUMBER, cast(byref(tbl), POINTER(c_uint8)), sizeof(tbl), err):
    for field in tbl._fields_:
      if field[0] != 'reserve':
        print (tbl.__class__.__dict__[field[0]].offset, field[0], '=', getattr(tbl, field[0]))
  DX2_ClosePort (dev)

位置決め制御時のPIDゲインを設定する等の連続したアドレスに割り当てられた複数のアイテムを同時に書き換える場合はDX2_WriteBlockDataを用いる。

from dx2lib import *

dev = DX2_OpenPort (b'\\\\.\\COM3', 57600)
if dev != None:
  gain = PositionGain(0, 0, 400)
  DX2_WriteBlockData (dev, 1, ADDRESS_X_POSITION_DGAIN, cast(byref(gain), POINTER(c_uint8)), sizeof (gain), None)
  DX2_ClosePort (dev)

追加APIに無い機能は基本APIを使ってコントロールテーブルへ直接アクセスするしかないため、参考にしてもらえればと思う。

Page Top
Pythonその2 anchor.png

全てPythonで書いてしまえば、態々Cで作られたライブラリをリンクしたり、ctypesの有象無象に呵まれる事も無くスッキリする。という事でインストラクションパケットの生成と送信、ステータスパケットの受信、インストラクションの種類によってパラメータ部分を結合・分解するクラスを作ってみた。前記の内容とは一線を画すので、互換性は一切考えていない。 pySerialは必須。
なお単体で試せるようにデモコードを付けておいた。ターゲットはXM/XH/XDシリーズ、IDは1~5(少なくともID=1は必須)、ボーレートは3Mbpsを想定、デバッグ用にpsutilが必要、マルチスレッドでアクセスといったところだ。プロトコルV1に対応したDynamixelの場合は末尾のコメントになっているコードを利用してもらえればと。
一応関数に与えられたパラメータを確認しているが、全領域で検証したものではない。また値の正負については勝手に判断している部分もあるので、都合が悪ければ適宜修正してもらえればと思う。

ソースへの直リンク

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# pyDXL.py
# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: (C) 2024 mukyokyo

import serial, threading, multiprocessing, array, struct
from typing import Union
from collections import namedtuple
from struct import pack, unpack, iter_unpack

##########################################################
# Functionalized the part of converting int to bytes.
# If specified as a tuple, it is converted to bytes at once.
##########################################################
def B2Bs(d) -> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d)
  else:
    return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d))

def W2Bs(d) -> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]])
  else:
    return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d))

def L2Bs(d) -> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]])
  else:
    return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d))

##########################################################
# API for Dynamixel protocol V1
##########################################################
class DXLProtocolV1:
  BROADCASTING_ID = 0xfe
  INST_PING           = 0x01
  INST_READ           = 0x02
  INST_WRITE          = 0x03
  INST_REG_WRITE      = 0x04
  INST_ACTION         = 0x05
  INST_FACTORY_RESET  = 0x06
  INST_REBOOT         = 0x08
  INST_SYNC_WRITE     = 0x83
  INST_SYNG_REG_WRITE = 0x85

  TSyncW = namedtuple("TSyncW", ("id", ("data")))

  def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None):
    """
    Initalize

    parameters
    -------------
    port : str
      Device name
    baudrate : int
      Serial baudrate[bps]
    timeout : float
      Read timeout[s]
    """
    if isinstance(port, serial.Serial):
      self.__serial = port
      self.__baudrate = port.baudrate
      self.__timeout = port.timeout
    else:
      self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout)
      self.__baudrate = self.__serial.baudrate
      self.__timeout = self.__serial.timeout
    if lock == None:
      self.__lock = threading.Lock()
    else:
      self.__lock = lock
    self.__Error = 0

  @property
  def lock(self):
    return self.__lock

  @property
  def baudrate(self):
    return self.__serial.baudrate

  @baudrate.setter
  def baudrate(self, baudrate):
    self.__baudrate = baudrate
    self.__serial.baudrate = baudrate

  @property
  def timeout(self):
    return self.__serial.timeout

  @timeout.setter
  def timeout(self, timeout):
    self.__timeout = timeout
    self.__serial.timeout = timeout

  def __reconfig(self):
    self.__serial.baudrate = self.__baudrate
    self.__serial.timeout = self.__timeout

  @property
  def Error(self):
    return self.__Error

  def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool):
    """
    Sending packets

    parameters
    -------------
    id : int
      Target ID
    inst : int
      Instruction command
    param : bytes
      Packet parameters

    Returns
    -------
    bytes
      Packets sent
    bool
      Success or failure
    """
    self.__reconfig()
    if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 253)) and len(param) <= (256 - 6):
      instp = bytearray([0xff,0xff,id,0,inst]) + bytes(param)
      instp[3] = len(instp) - 3
      instp += B2Bs(~sum(instp[2:]) & 0xff)
      self.__serial.reset_input_buffer()
      if echo: print('TX:', instp.hex(':'))
      self.__serial.write(instp)
      return bytes(instp), True
    return None, False

  def __rx(self, length) -> bytes:
    s = self.__serial.read(length)
    l = len(s)
    if l == length:
      return s
    else:
      r = s
      length -= l
      if length > 0:
        while self.__serial.in_waiting > 0:
          s = self.__serial.read(length)
          r += s
          length -= len(s)
          if length == 0:
            break
      return r

  def RxPacket(self, echo = False) -> (bytes, bool):
    """
    Receiving packets

    Returns
    -------
    bytes
      Packets received
    bool
      Success or failure
    """
    statp = self.__rx(5)
    if statp:
      if len(statp) == 5:
        if statp[0] == 0xff and statp[1] == 0xff:
          l = statp[3] - 1
          self.__Error = statp[4]
          statp += self.__rx(l)
          if len(statp) == l + 5:
            if statp[-1:][0] == ((~sum(statp[2:-1])) & 0xff):
              if echo: print('RX:', statp.hex(':'))
              return bytes(statp), (statp[4] & 0x40) == 0
    return None, False

  def Write(self, id : int, addr : int, data : bytes, echo = False) -> bool:
    """
    Write instruction

    parameters
    -------------
    id : int
      Target ID
    addr : int
      Target item address
    data : bytes
      Data to be written

    Returns
    -------
    result : bool
      Success or failure
    """
    with self.__lock:
      if id >= 0 and id <= self.BROADCASTING_ID and addr >= 0  and addr <= 254:
        if self.TxPacket(id, self.INST_WRITE, B2Bs(addr) + data, echo)[1]:
          if id != self.BROADCASTING_ID:
            dat, r = self.RxPacket(echo)
            if r:
              return dat[2] == id and (dat[4] & 0x18) == 0
          else:
            return True
      return False

  def Write8(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool:
    return self.Write(id, addr, B2Bs(data), echo)

  def Write16(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool:
    return self.Write(id, addr, W2Bs(data), echo)

  def Write32(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool:
    return self.Write(id, addr, L2Bs(data), echo)

  def Read(self, id : int, addr : int, length : int, echo = False) -> bytes:
    """
    Read instruction

    parameters
    -------------
    id : int
      Target ID
    addr : int
      Target item address
    length : int
      Number of bytes to read

    Returns
    -------
    bytes
      Data read
    """
    with self.__lock:
      if id >= 0 and id <= 253 and addr >= 0 and addr <= 254 and length > 0 and length <= (256 - 6):
        if self.TxPacket(id, self.INST_READ, B2Bs((addr, length)), echo)[1]:
          dat, r = self.RxPacket(echo)
          if r:
            if dat[2] == id and (dat[4] & 0x8) == 0:
              return bytes(dat[5:-1])
      return None

  def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.Read(id, addr, length, echo)
    if r != None:
      n = sum(iter_unpack('b' if signed else 'B', r), ())
      return n if length > 1 else n[0]
    return None

  def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.Read(id, addr, 2 << (length - 1), echo)
    if r != None:
      n = sum(iter_unpack('h' if signed else 'H', r), ())
      return n if length > 1 else n[0]
    return None

  def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.Read(id, addr, 4 << (length - 1), echo)
    if r != None:
      n = sum(iter_unpack('i' if signed else 'I', r), ())
      return n if length > 1 else n[0]
    return None

  def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool:
    """
    Sync Write instruction

    parameters
    -------------
    addr : int
      Target item address
    length : int
      Number of bytes to write
    id_datas : (TSyncW)
      Target ID and data

    Returns
    -------
    bool
      Success or failure
    """
    with self.__lock:
      if addr >= 0 and addr <= 254 and length > 0 and length < (256 - 6):
        param = B2Bs((addr,length))
        for d in id_datas:
          param += B2Bs(d.id) + d.data
          if len(d.data) != length or d.id < 0 or d.id > 253:
            del param
            return False
        return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1]
      return False

  def Ping(self, id : int, echo = False) -> bool:
    with self.__lock:
      if self.TxPacket(id, self.INST_PING, bytes(), echo)[1]:
        dat, r = self.RxPacket(echo)
        if r:
          return id == dat[2] and dat[3] == 2
      return False

  def FactoryReset(self, id : int, echo = False) -> bool:
    with self.__lock:
      if self.TxPacket(id, self.INST_FACTORY_RESET, bytes(), echo)[1]:
        dat, r = self.RxPacket(echo)
        if r:
          return id == dat[2] and dat[3] == 2
      return False

  def Reboot(self, id : int, echo = False) -> bool:
    with self.__lock:
      if self.TxPacket(id, self.INST_REBOOT, bytes(), echo)[1]:
        dat, r = self.RxPacket(echo)
        if r:
          return id == dat[2] and dat[3] == 2
      return False

##########################################################
# API for Dynamixel protocol V2
##########################################################
class DXLProtocolV2:
  BROADCASTING_ID = 0xfe
  INST_PING                 = 0x01
  INST_READ                 = 0x02
  INST_WRITE                = 0x03
  INST_REG_WRITE            = 0x04
  INST_ACTION               = 0x05
  INST_FACTORY_RESET        = 0x06
  INST_REBOOT               = 0x08
  INST_SYS_WRITE            = 0x0d
  INST_CLEAR                = 0x10
  INST_CONTROL_TABLE_BACKUP = 0x20
  INST_STATUS               = 0x55
  INST_SYNC_READ            = 0x82
  INST_SYNC_WRITE           = 0x83
  INST_SYNG_REG_WRITE       = 0x85
  INST_FAST_SYNC_READ       = 0x8a
  INST_BULK_READ            = 0x92
  INST_BULK_WRITE           = 0x93
  INST_FAST_BULK_READ       = 0x9a

  TSyncW = namedtuple("TSyncW", ("id", ("data")))
  TBulkW = namedtuple("TBulkW", ("id", "addr", ("data")))
  TBulkR = namedtuple("TBulkR", ("id", "addr", "length"))

  __crc16_lutable = array.array('H')

  def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None):
    """
    Initalize

    parameters
    -------------
    port : str
      Device name
    baudrate : int
      Serial baudrate[bps]
    timeout : float
      Read timeout[s]
    """
    if isinstance(port, serial.Serial):
      self.__serial = port
      self.__baudrate = port.baudrate
      self.__timeout = port.timeout
    else:
      self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout)
      self.__baudrate = self.__serial.baudrate
      self.__timeout = self.__serial.timeout
    if lock == None:
      self.__lock = threading.Lock()
    else:
      self.__lock = lock
    self.__Error = 0
    poly = 0x8005
    for i in range(256):
      nData = i << 8
      nAccum = 0
      for j in range(8):
        nAccum = ((nAccum << 1) ^ poly if (nData ^ nAccum) & 0x8000 else nAccum << 1) & 0xffff
        nData <<= 1
      self.__crc16_lutable.append(nAccum)

  @property
  def lock(self):
    return self.__lock

  @property
  def baudrate(self):
    return self.__serial.baudrate

  @baudrate.setter
  def baudrate(self, baudrate):
    self.__baudrate = baudrate
    self.__serial.baudrate = baudrate

  @property
  def timeout(self):
    return self.__serial.timeout

  @timeout.setter
  def timeout(self, timeout):
    self.__timeout = timeout
    self.__serial.timeout = timeout

  def __reconfig(self):
    self.__serial.baudrate = self.__baudrate
    self.__serial.timeout = self.__timeout

  @property
  def Error(self):
    return self.__Error

  def __crc16(self, data : bytes) -> int:
    crc = 0
    for d in data:
      crc = (crc << 8) ^ self.__crc16_lutable[(((crc >> 8) ^ d) & 0xff)]
    return crc & 0xffff

  def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool):
    """
    Sending packets

    parameters
    -------------
    id : int
      Target ID
    inst : int
      Instruction command
    param : bytes
      Packet parameters

    Returns
    -------
    bytes
      Packets sent
    bool
      Success or failure
    """
    self.__reconfig()
    if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 252)) and len(param) < (65536 - 10):
      instp = bytearray([0xff,0xff,0xfd,0x00,id,0,0,inst]) + bytearray(param).replace(b'\xff\xff\xfd',b'\xff\xff\xfd\xfd')
      instp[5:7] = W2Bs(len(instp) - 5)
      instp += W2Bs(self.__crc16(instp))
      self.__serial.reset_input_buffer()
      if echo: print('TX:', instp.hex(':'))
      self.__serial.write(instp)
      return bytes(instp), True
    return None, False

  def __rx(self, length) -> bytes:
    s = self.__serial.read(length)
    l = len(s)
    if l == length:
      return s
    else:
      r = s
      length -= l
      if length > 0:
        while self.__serial.in_waiting > 0:
          s = self.__serial.read(length)
          r += s
          length -= len(s)
          if length == 0:
            break
      return r

  def RxPacket(self, echo = False) -> (bytes, bool):
    """
    Receiving packets

    Returns
    -------
    bytes
      Packets received
    bool
      Success or failure
    """
    self.__serial.flush()
    statp = self.__rx(9)
    if statp:
      if len(statp) == 9:
        if statp[0] == 0xff and statp[1] == 0xff and statp[2] == 0xfd and statp[3] == 0 and statp[7] == 0x55:
          l = unpack('<H', statp[5:7])[0] - 2
          self.__Error = statp[8]
          statp += self.__rx(l)
          if len(statp) == l + 9:
            if unpack('<H', statp[-2:])[0] == self.__crc16(statp[:-2]):
              statp = statp[0:9] + statp[9:].replace(b'\xff\xff\xfd\xfd',b'\xff\xff\xfd')
              if echo: print('RX:', statp.hex(':'))
              return bytes(statp), (statp[8] & 0x7f) == 0
    return None, False

  def Write(self, id : int, addr : int, data : bytes, echo = False) -> bool:
    """
    Write instruction

    parameters
    -------------
    id : int
      Target ID
    addr : int
      Target item address
    data : bytes
      Data to be written

    Returns
    -------
    result : bool
      Success or failure
    """
    with self.__lock:
      if ((id >= 0 and id <= 252) or (id == self.BROADCASTING_ID)) and addr >= 0 and addr <= 65535:
        if self.TxPacket(id, self.INST_WRITE, W2Bs(addr) + data, echo)[1]:
          if id != self.BROADCASTING_ID:
            dat, r = self.RxPacket(echo)
            if r:
              return dat[4] == id and (dat[8] & 0x7f) == 0
          else:
            return True
      return False

  def Write8(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool:
    return self.Write(id, addr, B2Bs(data), echo)

  def Write16(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool:
    return self.Write(id, addr, W2Bs(data), echo)

  def Write32(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool:
    return self.Write(id, addr, L2Bs(data), echo)

  def Read(self, id : int, addr : int, length : int, echo = False) -> bytes:
    """
    Read instruction

    parameters
    -------------
    id : int
      Target ID
    addr : int
      Target item address
    length : int
      Number of bytes to read

    Returns
    -------
    bytes
      Data read
    bool
      Success or failure
    """
    with self.__lock:
      if id >= 0 and id <= 252 and addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10):
        if self.TxPacket(id, self.INST_READ, W2Bs(addr) + W2Bs(length), echo)[1]:
          dat, r = self.RxPacket(echo)
          if r:
            return bytes(dat[9:-2])
      return None

  def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.Read(id, addr, length, echo)
    if r != None:
      n = sum(iter_unpack('b' if signed else 'B', r), ())
      return n if length > 1 else n[0]
    return None

  def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.Read(id, addr, 2 << (length - 1), echo)
    if r != None:
      n = sum(iter_unpack('h' if signed else 'H', r), ())
      return n if length > 1 else n[0]
    return None

  def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.Read(id, addr, 4 << (length - 1), echo)
    if r != None:
      n = sum(iter_unpack('i' if signed else 'I', r), ())
      return n if length > 1 else n[0]
    return None

  def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool:
    """
    Sync Write instruction

    parameters
    -------------
    addr : int
      Target item address
    length : int
      Number of bytes to write
    id_datas : (TSyncW)
      Target ID and data

    Returns
    -------
    bool
      Success or failure
    """
    with self.__lock:
      if addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10):
        param = W2Bs(addr) + W2Bs(length)
        for d in id_datas:
          param += bytes((d.id,)) + d.data
          if len(d.data) != length or d.id < 0 or d.id > 252:
            del param
            return False
        return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1]
      return False

  def SyncRead(self, addr : int, length : int, ids : (int), echo = False) -> tuple:
    """
    Sync Read instruction

    parameters
    -------------
    addr : int
      Target item address
    length : int
      Number of bytes to read
    ids : (int)
      Target IDs

    Returns
    -------
    tuple
      Read ID and data
    """
    with self.__lock:
      result = ()
      if addr >= 0 and addr < 65535 and length > 0 and length < (65536 - 10):
        if self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_READ, W2Bs(addr) + W2Bs(length) + B2Bs(ids), echo)[1]:
          for id in ids:
            dat, r = self.RxPacket(echo)
            if r:
              if dat[4] == id:
                result += (id, bytes(dat[9:9 + length])),
              else:
                result += (id, bytes([])),
            else:
              result += (id, bytes([])),
      return result

  def BulkWrite(self, data : (TBulkW), echo = False) -> bool:
    """
    Bulk Write instruction

    parameters
    -------------
    data : (TBulkW)
      Target ID and address, data
    ids : (TSyncW)
      Target ID and data

    Returns
    -------
    bool
      Success or failure
    """
    with self.__lock:
      param = bytes()
      for d in data:
        if d.id >= 0 and d.id <= 252 and d.addr >= 0 and d.addr <= 65535:
          param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(len(d.data)) + d.data
        else:
          del param
          return False
      return self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_WRITE, param, echo)[1]

  def BulkRead(self, data:(TBulkR), echo = False) -> tuple:
    """
    Bulk Read instruction

    parameters
    -------------
    data : (TBulkR)
      ID, address, and number of bytes to be read

    Returns
    -------
    tuple
      Read ID and data
    """
    with self.__lock:
      result = ()
      param = bytes()
      for d in data:
        if d.id < 0 or d.addr < 0 or d.length < 0:
          return result
        param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(d.length)
      if self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_READ, param, echo)[1]:
        for d in data:
          rxd, r = self.RxPacket(echo)
          if r:
            if(d.id == rxd[4]):
              result += (d.id, bytes(rxd[9:-2])),
            else:
              result += (d.id, bytes([])),
          else:
            result += (d.id, bytes([])),
      del param
      return result

  def Ping(self, id : int, echo = False) -> bool:
    with self.__lock:
      if self.TxPacket(id, self.INST_PING, bytes(), echo)[1]:
        rxd, r = self.RxPacket(echo)
        if r:
          return id == rxd[4] and rxd[5] == 7 and rxd[6] == 0
      return False

  def FactoryReset(self, id : int, p1 : int, echo = False) -> bool:
    with self.__lock:
      if self.TxPacket(id, self.INST_FACTORY_RESET, bytes((p1,)), echo)[1]:
        rxd, r = self.RxPacket(echo)
        if r:
          return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0
      return False

  def Reboot(self, id : int, echo = False) -> bool:
    with self.__lock:
      if self.TxPacket(id, self.INST_REBOOT, bytes(), echo)[1]:
        rxd, r = self.RxPacket(echo)
        if r:
          return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0
      return False

  def Clear(self, id : int, val, echo = False) -> bool:
    with self.__lock:
      if self.TxPacket(id, self.INST_CLEAR, B2Bs(1) + L2Bs(val), echo)[1]:
        rxd, r = self.RxPacket(echo)
        if r:
          return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0
      return False

##########################################################
# test code
##########################################################
if __name__ == "__main__":
  from contextlib import contextmanager
  from threading import Thread
  import time, gc, psutil, os

  ec = False
  ID = 1
  fin = False

  @contextmanager
  def stopwatch():
    start_time = time.time()
    yield
    #print('..proc time={:.1f}ms {}'.format((time.time() - start_time) * 1000, psutil.Process(os.getpid()).memory_info().rss))

  def func1(dx):
    global ID, fin, ec

    try:
      '''
      """ reset dxl """
      with stopwatch():
        r = dx.FactoryReset(ID, 0xff, echo = ec)
      print(f'FactoryReset({ID}): {r}')
      time.sleep(0.5)
      '''
 
      """ ping dxl """
      print(f'Ping({ID})=', dx.Ping(ID, echo = ec))

      """ reboot dxl """
      with stopwatch():
        r = dx.Reboot(ID, echo = ec)
      print(f'Reboot({ID})={r}')
      time.sleep(0.5)

      """ basic packet proc """
      with stopwatch():
        with dx.mutex:
          r0 = dx.TxPacket(ID, dx.INST_WRITE, (65, 0, 1))
          r1 = dx.RxPacket()
      print(f'TxPacket({ID})={r0[1]}', r0[0].hex(':'))
      if r1[0]:
        print(f'RxPacket({ID})={r1[1]}', r1[0].hex(':'))

      """ dump memory """
      l = 50
      for addr in range(0, 700, l):
        r = dx.Read(ID, addr, l, echo = ec)
        print(f'Read({addr};{l})=',r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!')

      """ read byte item """
      with stopwatch():
        r = dx.Read8(ID, 65, echo = ec)
      print(f'Read8({ID})={r}')

      """ sync read inst. """
      print('SyncRead=')
      with stopwatch():
        d = dx.SyncRead(0, 4, (1, 2, 3, 4, 5), echo = ec)
      for d in d:
        print(f' ({d[0]}) 0,4 hex=', d[1].hex(':') if len(d[1]) > 0 else ())
      with stopwatch():
        d = dx.SyncRead(132, 4, (1, 2, 3, 4, 5), echo = ec)
      for d in d:
        print(f' ({d[0]}) 132,4 int32=', unpack('<i', pack('<I', unpack('<I', d[1])[0]))[0] if len(d[1]) > 0 else ())

      """ sync write inst. """
      for i in range(30):
        with stopwatch():
          dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(1)), dx.TSyncW(2, B2Bs(1)), dx.TSyncW(3, B2Bs(1)), dx.TSyncW(4, B2Bs(1)), dx.TSyncW(5, B2Bs(1))), echo = ec)
        time.sleep(0.05)
        with stopwatch():
          dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(0)), dx.TSyncW(2, B2Bs(0)), dx.TSyncW(3, B2Bs(0)), dx.TSyncW(4, B2Bs(0)), dx.TSyncW(5, B2Bs(0))), echo = ec)
        time.sleep(0.05)

      """ set goal position """
      #torque off
      if dx.Write8(ID, 64, 0, echo = ec):
        #multi turn
        if dx.Write8(ID, 11, 4, echo = ec):
          #torque on
          if dx.Write8(ID, 64, 1, echo = ec):
            print(f'Write32/Read32({ID})')
            for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)):
              for i in range(10):
                #set goal position
                with stopwatch():
                  dx.Write32(ID, 116, gp, echo = ec)
                #get present position
                with stopwatch():
                  pp = dx.Read32(ID, 132, signed = True, echo = ec)
                if pp != None:
                  print(f' w={gp:6} r={pp:6} diff={gp-pp:5}', end='\r')
                else:
                  pass
                  print('None                            ', end='\r')
                  break
                time.sleep(0.01)
              else:
                continue
              pass
              break
            print('')
            #torque off
            with stopwatch():
              dx.Write8(ID, 64, 0, echo = ec)
            #normal turn
            with stopwatch():
              dx.Write8(ID, 11, 3, echo = ec)

      """ block read/write (add/remove suffixes) """
      with stopwatch():
        r = dx.Read(ID, 120, 27, echo = ec)
      if r:
        print(f'Read({ID};128)=', tuple(iter_unpack('<HBBhhiiIIHB', r))[0])

      fffffd = bytes((
        0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
        0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
        0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
       ))
      with stopwatch():
        r = dx.Write(ID, 634, fffffd, echo = ec)
      print(f'Write({ID})={r}')
      with stopwatch():
        r = dx.Read(ID, 634, len(fffffd), echo = ec)
      if r:
        print(f'Read({ID})=', r.hex(':'))

      """ bulk read inst. """
      with stopwatch():
        d = dx.BulkRead((dx.TBulkR(1,0,10), dx.TBulkR(2,0,20), dx.TBulkR(3,0,30), dx.TBulkR(4,0,40), dx.TBulkR(5,0,40)), echo = ec)
      print('BulkRead=')
      for d in d:
        print(f' ({d[0]},0)', d[1].hex(':'))

      dx.Write8(ID, 64, 1, echo = ec)
      """ bulk write inst. """
      print('BulkWrite=')
      with stopwatch():
        print(' 1', dx.BulkWrite((dx.TBulkW(1,104, L2Bs((0,0,0,1024))), dx.TBulkW(2,65,B2Bs(0)), dx.TBulkW(3,65,B2Bs(0)), dx.TBulkW(4,65,B2Bs(0))), echo = ec))
      time.sleep(0.5)
      with stopwatch():
        print(' 2', dx.BulkWrite((dx.TBulkW(1,104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(2048)), dx.TBulkW(2,65,B2Bs(1)), dx.TBulkW(3,65,B2Bs(1)), dx.TBulkW(4,65,B2Bs(1))), echo = ec))
      time.sleep(0.5)
      with stopwatch():
        print(' 3', dx.BulkWrite((dx.TBulkW(1,104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(1024)), dx.TBulkW(2,65,B2Bs(0)), dx.TBulkW(3,65,B2Bs(0))), echo = ec))
      time.sleep(0.5)
      with stopwatch():
        print(' 4', dx.BulkWrite((dx.TBulkW(1,116, L2Bs(2048)), dx.TBulkW(2,65,B2Bs(1)), dx.TBulkW(3,65,B2Bs(1))), echo = ec))
      time.sleep(0.5)
      with stopwatch():
        print(' 5', dx.BulkWrite((dx.TBulkW(1,65,bytes(0)),dx.TBulkW(2,65,bytes(0)),dx.TBulkW(3,65,bytes(0))), echo = ec))

      dx.Write8(ID, 64, 0, echo = ec)

    except Exception as ex:
      pass
      import traceback
      print('--- Caught Exception ---')
      traceback.print_exc()
      print('------------------------')
      '''
      trace = []
      tb = ex.__traceback__
      while tb is not None:
        trace.append({
          "filename": tb.tb_frame.f_code.co_filename,
          "name": tb.tb_frame.f_code.co_name,
          "lineno": tb.tb_lineno
        })
        tb = tb.tb_next
      print(str({
        'type': type(ex).__name__,
        'message': str(ex),
        'trace': trace
      }))
      '''

    fin = True

  def func2(dx):
    global ID, fin, ec

    while not fin:
      try:
        l = dx.Read8(ID, 65, echo = ec)
        if l != None:
          l ^= 1
          dx.Write8(ID, 65, l, echo = ec)
        time.sleep(0.05)
      except Exception as ex:
        import traceback
        print('--- Caught Exception ---')
        traceback.print_exc()
        print('------------------------')
        return

  def func3(dx):
    global ID, fin, ec

    try:
      """ dump memory """
      l = 20
      for addr in range(0, 250, l):
        r = dx.Read(ID, addr, l, echo = ec)
        print(f'Read({addr};{l})=',r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!')

      '''
      """ reset dxl """
      with stopwatch():
        r = dx.FactoryReset(ID, echo = ec)
      print(f'FactoryReset({ID}): {r}')
      time.sleep(0.5)
      '''

      """ ping dxl """
      print(f'Ping({ID})=', dx.Ping(ID, echo = ec))

      """ reboot dxl """
      with stopwatch():
        r = dx.Reboot(ID, echo = ec)
      print(f'Reboot({ID})={r}')
      time.sleep(1)

      """ basic packet proc """
      with stopwatch():
        with dx.mutex:
          r0 = dx.TxPacket(ID, dx.INST_WRITE, (25, 1))
          r1 = dx.RxPacket()
      print(f'TxPacket({ID})={r0[1]}', r0[0].hex(':'))
      if r1[0]:
        print(f'RxPacket({ID})={r1[1]}', r1[0].hex(':'))

      """ read byte item """
      with stopwatch():
        r = dx.Read8(ID, 25, echo = ec)
      print(f'Read8({ID})={r}')

      """ sync write inst. """
      for i in range(30):
        with stopwatch():
          dx.SyncWrite(25, 1, (dx.TSyncW(1, B2Bs(1)), dx.TSyncW(2, B2Bs(1)), dx.TSyncW(3, B2Bs(1))), echo = ec)
        time.sleep(0.05)
        with stopwatch():
          dx.SyncWrite(25, 1, (dx.TSyncW(1, B2Bs(0)), dx.TSyncW(2, B2Bs(0)), dx.TSyncW(3, B2Bs(0))), echo = ec)
        time.sleep(0.05)

      """ set goal position """
      #torque off
      if dx.Write8(ID, 24, 0, echo = ec):
        #multi turn
        if dx.Write16(ID, 6, (4095, 4095), echo = ec):
          #torque on
          if dx.Write8(ID, 24, 1, echo = ec):
            print(f'Write16/Read16({ID})')
            for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)):
              for i in range(10):
                #set goal position
                with stopwatch():
                  dx.Write16(ID, 30, gp, echo = ec)
                #get present position
                with stopwatch():
                  pp = dx.Read16(ID, 36, signed = True, echo = ec)
                if pp != None:
                  print(f' w={gp:6} r={pp:6} diff={gp-pp:5}', end='\r')
                else:
                  pass
                  print('None                            ', end='\r')
                  break
                time.sleep(0.005)
              else:
                continue
              pass
              break
            print('')
            #torque off
            dx.Write8(ID, 24, 0, echo = ec)
            #normal turn
            with stopwatch():
              dx.Write16(ID, 6, (0, 4095), echo = ec)
    except Exception as ex:
      pass
      import traceback
      print('--- Caught Exception ---')
      traceback.print_exc()
      print('------------------------')

  import sys
  import platform
  print(sys.version)

  try:
    dx = DXLProtocolV2('\\\\.\\COM12', 3000000)
  except:
    pass
  else:
    th1 = Thread(target = func1, args = (dx,))
    th2 = Thread(target = func2, args = (dx,))
    th1.start()
    th2.start()
    th1.join()
    th2.join()
    del th1, th2, dx

  try:
    dx = DXLProtocolV1('\\\\.\\COM12', 57600)
  except:
    dx = None
  else:
    th3 = Thread(target = func3, args = (dx,))
    th3.start()
    th3.join()
    del th3, dx

  print('fin')
Page Top
Pythonその3 anchor.png

Python2ついでにMicroPythonの互換性レベルを図ってみた。
欲しいライブラリが用意されていない以上に、基本的な部分が結構緩い。また実際に動かすとそれなりに遅い。それでもMicroPythonは存在しているし、個人的にはかつてのBASICの感覚が蘇ってきたので、ちょっとした使い方であれば選択肢に入れても良いかなと。
ひとまずPicoSHIELD+Raspberry Pi Pico上で動くように修正してみた。オーバーロックしているのはご愛敬として、マルチスレッドを除外した程度で実質PC版と大差無い。@micropython.viperを多用した時はかなり高速化できていたのだが、ここはコードの互換性を優先。

# pyDXL.py for MicroPython(RP2040)
# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: (C) 2024 mukyokyo


from machine import UART, Pin
import time, array
from collections import namedtuple
from struct import pack, unpack

##########################################################
# Functionalized the part of converting int to bytes.
# If specified as a tuple, it is converted to bytes at once.
##########################################################
@micropython.native
def B2Bs(d) -> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d)
  else:
    return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d))

@micropython.native
def W2Bs(d) -> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]])
  else:
    return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d))

@micropython.native
def L2Bs(d) -> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]])
  else:
    return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d))

##########################################################
# API for Dynamixel protocol V1
##########################################################
class DXLProtocolV1:
  BROADCASTING_ID = 0xfe
  INST_PING           = 0x01
  INST_READ           = 0x02
  INST_WRITE          = 0x03
  INST_REG_WRITE      = 0x04
  INST_ACTION         = 0x05
  INST_FACTORY_RESET  = 0x06
  INST_REBOOT         = 0x08
  INST_SYNC_WRITE     = 0x83
  INST_SYNG_REG_WRITE = 0x85

  TSyncW = namedtuple("TSyncW", ("id", ("data")))

  def __init__(self, baudrate = 57600, timeout_ms = 50):
    """
    Initalize

    parameters
    -------------
    port : int
      UART no.
    baudrate : int
      Serial baudrate[bps]
    timeout : float
      Read timeout[s]
    """
    self.__Error = 0
    self.__baudrate = baudrate
    self.__timeout = timeout_ms
    self.__serial = UART(1, baudrate, tx = Pin(4), rx = Pin(5), txbuf = 500, rxbuf = 500, timeout = timeout_ms)

  @property
  def baudrate(self):
    return self.__baudrate

  @baudrate.setter
  def baudrate(self, baudrate):
    self.__baudrate = baudrate
    self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout)

  @property
  def timeout(self):
    return self.__timeout

  @timeout.setter
  def timeout(self, timeout_ms):
    self.__timeout = timeout_ms
    self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout)

  @property
  def Error(self):
    return self.__Error

  @micropython.native
  def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool):
    """
    Sending packets

    parameters
    -------------
    id : int
      Target ID
    inst : int
      Instruction command
    param : bytes
      Packet parameters

    Returns
    -------
    bytes
      Packets sent
    bool
      Success or failure
    """
    if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 253)) and len(param) <= (256 - 6):
      instp = bytearray([0xff,0xff,id,0,inst]) + bytes(param)
      instp[3] = len(instp) - 3
      instp += B2Bs(~sum(instp[2:]) & 0xff)
      _ = self.__serial.read(self.__serial.any())
      if echo: print('TX:', instp.hex(':'))
      self.__serial.write(instp)
      return bytes(instp), True
    return None, False

  @micropython.native
  def __rx(self, length) -> bytes:
    s = self.__serial.read(length)
    if s:
      l = len(s)
      if l == length:
        return s
      else:
        r = s
        length -= l
        while self.__serial.any() > 0:
          s = self.__serial.read(length)
          r += s
          length -= len(s)
          if length == 0:
            break
        return r
    else:
      return bytes()

  @micropython.native
  def RxPacket(self, echo = False) -> (bytes, bool):
    """
    Receiving packets

    Returns
    -------
    bytes
      Packets received
    bool
      Success or failure
    """
    self.__serial.flush()
    statp = self.__rx(5)
    if statp:
      if len(statp) == 5:
        l = statp[3] - 1
        self.__Error = statp[4]
        statp += self.__rx(l)
        if len(statp) == l + 5:
          if statp[-1:][0] == ((~sum(statp[2:-1])) & 0xff):
            if echo: print('RX:', statp.hex(':'))
            return bytes(statp), (statp[4] & 0x40) == 0
    return None, False

  @micropython.native
  def Write(self, id : int, addr : int, data : bytes, echo = False) -> bool:
    """
    Write instruction

    parameters
    -------------
    id : int
      Target ID
    addr : int
      Target item address
    data : bytes
      Data to be written

    Returns
    -------
    result : bool
      Success or failure
    """
    if id >= 0 and id <= self.BROADCASTING_ID and addr >= 0  and addr <= 254:
      if self.TxPacket(id, self.INST_WRITE, B2Bs(addr) + data, echo)[1]:
        if id != self.BROADCASTING_ID:
          dat, r = self.RxPacket(echo)
          if r:
            return dat[2] == id and (dat[4] & 0x18) == 0
        else:
          return True
    return False

  @micropython.native
  def Write8(self, id : int, addr : int, data : int | tuple | list, echo = False) -> bool:
    return self.Write(id, addr, B2Bs(data), echo)

  @micropython.native
  def Write16(self, id : int, addr : int, data : int | tuple | list, echo = False) -> bool:
    return self.Write(id, addr, W2Bs(data), echo)

  @micropython.native
  def Write32(self, id : int, addr : int, data : int | tuple | list, echo = False) -> bool:
    return self.Write(id, addr, L2Bs(data), echo)

  @micropython.native
  def Read(self, id : int, addr : int, length : int, echo = False) -> (bytes, bool):
    """
    Read instruction

    parameters
    -------------
    id : int
      Target ID
    addr : int
      Target item address
    length : int
      Number of bytes to read

    Returns
    -------
    bytes
      Data read
    bool
      Success or failure
    """
    if id >= 0 and id <= 253 and addr >= 0 and addr <= 254 and length > 0 and length <= (256 - 6):
      if self.TxPacket(id, self.INST_READ, B2Bs((addr, length)), echo)[1]:
        dat, r = self.RxPacket(echo)
        if r:
          if dat[2] == id and (dat[4] & 0x8) == 0:
            return bytes(dat[5:-1])
    return None

  @micropython.native
  def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.Read(id, addr, length, echo)
    if r != None:
      n = unpack('<' + (('b'*length) if signed else ('B'*length)), r[0:1 * length])
      return n if length > 1 else n[0]
    return None

  @micropython.native
  def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.Read(id, addr, 2 * length, echo)
    if r != None:
      n = unpack('<' + (('h'*length) if signed else ('H'*length)), r[0:2 * length])
      return n if length > 1 else n[0]
    return None

  @micropython.native
  def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.Read(id, addr, 4 * length, echo)
    if r != None:
      n = unpack('<' + (('i'*length) if signed else ('I'*length)), r[0:4 * length])
      return n if length > 1 else n[0]
    return None

  @micropython.native
  def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool:
    """
    Sync Write instruction

    parameters
    -------------
    addr : int
      Target item address
    length : int
      Number of bytes to write
    id_datas : (TSyncW)
      Target ID and data

    Returns
    -------
    bool
      Success or failure
    """
    if addr >= 0 and addr <= 254 and length > 0 and length < (256 - 6):
      param = B2Bs((addr,length))
      for d in id_datas:
        param += B2Bs(d.id) + d.data
        if len(d.data) != length or d.id < 0 or d.id > 253:
          del param
          return False
      del param
      return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1]
    return False

  @micropython.native
  def Ping(self, id : int, echo = False) -> bool:
    if self.TxPacket(id, self.INST_PING, bytes(), echo):
      dat, r = self.RxPacket(echo)
      if r:
        return id == dat[2] and dat[3] == 2
    return False

  @micropython.native
  def FactoryReset(self, id : int, echo = False) -> bool:
    if self.TxPacket(id, self.INST_RESET, bytes(), echo):
      dat, r = self.RxPacket(echo)
      if r:
        return id == dat[2] and dat[3] == 2
    return False

  @micropython.native
  def Reboot(self, id : int, echo = False) -> bool:
    if self.TxPacket(id, self.INST_REBOOT, bytes(), echo):
      dat, r = self.RxPacket(echo)
      if r:
        return id == dat[2] and dat[3] == 2
    return False

##########################################################
# API for Dynamixel protocol V2
##########################################################
class DXLProtocolV2:
  BROADCASTING_ID = const(0xfe)
  INST_PING                 = const(0x01)
  INST_READ                 = const(0x02)
  INST_WRITE                = const(0x03)
  INST_REG_WRITE            = const(0x04)
  INST_ACTION               = const(0x05)
  INST_FACTORY_RESET        = const(0x06)
  INST_REBOOT               = const(0x08)
  INST_SYS_WRITE            = const(0x0d)
  INST_CLEAR                = const(0x10)
  INST_CONTROL_TABLE_BACKUP = const(0x20)
  INST_STATUS               = const(0x55)
  INST_SYNC_READ            = const(0x82)
  INST_SYNC_WRITE           = const(0x83)
  INST_SYNG_REG_WRITE       = const(0x85)
  INST_FAST_SYNC_READ       = const(0x8a)
  INST_BULK_READ            = const(0x92)
  INST_BULK_WRITE           = const(0x93)
  INST_FAST_BULK_READ       = const(0x9a)

  TSyncW = namedtuple("TSyncW", ("id", ("data")))
  TBulkW = namedtuple("TBulkW", ("id", "addr", ("data")))
  TBulkR = namedtuple("TBulkR", ("id", "addr", "length"))

  __crc16_lutable = array.array('H', (
    0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011,
    0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D, 0x8027, 0x0022,
    0x8063, 0x0066, 0x006C, 0x8069, 0x0078, 0x807D, 0x8077, 0x0072,
    0x0050, 0x8055, 0x805F, 0x005A, 0x804B, 0x004E, 0x0044, 0x8041,
    0x80C3, 0x00C6, 0x00CC, 0x80C9, 0x00D8, 0x80DD, 0x80D7, 0x00D2,
    0x00F0, 0x80F5, 0x80FF, 0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1,
    0x00A0, 0x80A5, 0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1,
    0x8093, 0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087, 0x0082,
    0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D, 0x8197, 0x0192,
    0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB, 0x01AE, 0x01A4, 0x81A1,
    0x01E0, 0x81E5, 0x81EF, 0x01EA, 0x81FB, 0x01FE, 0x01F4, 0x81F1,
    0x81D3, 0x01D6, 0x01DC, 0x81D9, 0x01C8, 0x81CD, 0x81C7, 0x01C2,
    0x0140, 0x8145, 0x814F, 0x014A, 0x815B, 0x015E, 0x0154, 0x8151,
    0x8173, 0x0176, 0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162,
    0x8123, 0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137, 0x0132,
    0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E, 0x0104, 0x8101,
    0x8303, 0x0306, 0x030C, 0x8309, 0x0318, 0x831D, 0x8317, 0x0312,
    0x0330, 0x8335, 0x833F, 0x033A, 0x832B, 0x032E, 0x0324, 0x8321,
    0x0360, 0x8365, 0x836F, 0x036A, 0x837B, 0x037E, 0x0374, 0x8371,
    0x8353, 0x0356, 0x035C, 0x8359, 0x0348, 0x834D, 0x8347, 0x0342,
    0x03C0, 0x83C5, 0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1,
    0x83F3, 0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7, 0x03E2,
    0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD, 0x83B7, 0x03B2,
    0x0390, 0x8395, 0x839F, 0x039A, 0x838B, 0x038E, 0x0384, 0x8381,
    0x0280, 0x8285, 0x828F, 0x028A, 0x829B, 0x029E, 0x0294, 0x8291,
    0x82B3, 0x02B6, 0x02BC, 0x82B9, 0x02A8, 0x82AD, 0x82A7, 0x02A2,
    0x82E3, 0x02E6, 0x02EC, 0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2,
    0x02D0, 0x82D5, 0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1,
    0x8243, 0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257, 0x0252,
    0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E, 0x0264, 0x8261,
    0x0220, 0x8225, 0x822F, 0x022A, 0x823B, 0x023E, 0x0234, 0x8231,
    0x8213, 0x0216, 0x021C, 0x8219, 0x0208, 0x820D, 0x8207, 0x0202
  ))

  def __init__(self, baudrate = 57600, timeout_ms = 50):
    """
    Initalize

    parameters
    -------------
    port : int
      UART no.
    baudrate : int
      Serial baudrate[bps]
    timeout : float
      Read timeout[s]
    """
    self.__Error = 0
    self.__baudrate = baudrate
    self.__timeout = timeout_ms
    self.__serial = UART(1, baudrate, tx = Pin(4), rx = Pin(5), txbuf = 500, rxbuf = 500, timeout = timeout_ms)

  @property
  def baudrate(self):
    return self.__baudrate

  @baudrate.setter
  def baudrate(self, baudrate):
    self.__baudrate = baudrate
    self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout)

  @property
  def timeout(self):
    return self.__timeout

  @timeout.setter
  def timeout(self, timeout_ms):
    self.__timeout = timeout_ms
    self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout)

  @property
  def Error(self):
    return self.__Error

  @micropython.viper
  def __crc16(self, dat : ptr8, l:int) -> int:
    crc:int = 0
    pt = ptr16(self.__crc16_lutable)
    for i in range(l):
      crc = (crc << 8) ^ pt[(((crc >> 8) ^ dat[i]) & 0xff)]
    return crc & 0xffff

  @micropython.native
  def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool):
    """
    Sending packets

    parameters
    -------------
    id : int
      Target ID
    inst : int
      Instruction command
    param : bytes
      Packet parameters

    Returns
    -------
    bytes
      Packets sent
    bool
      Success or failure
    """
    if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 252)) and len(param) < (65536 - 10):
      instp = bytearray([0xff,0xff,0xfd,0x00,id,0,0,inst]) + bytearray(param).replace(b'\xff\xff\xfd',b'\xff\xff\xfd\xfd')
      instp[5:7] = W2Bs(len(instp) - 5)
      instp += W2Bs(self.__crc16(instp, len(instp)))
      _ = self.__serial.read(self.__serial.any())
      if echo: print('TX:', instp.hex(':'))
      self.__serial.write(instp)
      return bytes(instp), True
    return None, False

  @micropython.native
  def __rx(self, length) -> bytes:
    s = self.__serial.read(length)
    if s:
      l = len(s)
      if l == length:
        return s
      else:
        r = s
        length -= l
        while self.__serial.any() > 0:
          s = self.__serial.read(length)
          r += s
          length -= len(s)
          if length == 0:
            break
        return r
    else:
      return bytes()

  @micropython.native
  def RxPacket(self, echo = False) -> (bytes, bool):
    """
    Receiving packets

    Returns
    -------
    bytes
      Packets received
    bool
      Success or failure
    """
    self.__serial.flush()
    statp = self.__rx(9)
    if statp:
      if len(statp) == 9:
        if statp[0] == 0xff and statp[1] == 0xff and statp[2] == 0xfd and statp[3] == 0 and statp[7] == 0x55:
          l = unpack('<H', statp[5:7])[0] - 2
          self.__Error = statp[8]
          statp += self.__rx(l)
          if len(statp) == l + 9:
            if unpack('<H', statp[-2:])[0] == self.__crc16(statp[:-2], len(statp[:-2])):
              statp = statp[0:9] + statp[9:].replace(b'\xff\xff\xfd\xfd',b'\xff\xff\xfd')
              if echo: print('RX:', statp.hex(':'))
              return bytes(statp), (statp[8] & 0x7f) == 0
    return None, False

  @micropython.native
  def Write(self, id : int, addr : int, data: bytes, echo = False) -> bool:
    """
    Write instruction

    parameters
    -------------
    id : int
      Target ID
    addr : int
      Target item address
    data : bytes
      Data to be written

    Returns
    -------
    result : bool
      Success or failure
    """
    if ((id >= 0 and id <= 252) or (id == self.BROADCASTING_ID)) and addr >= 0 and addr <= 65535:
      if self.TxPacket(id, self.INST_WRITE, W2Bs(addr) + data, echo)[1]:
        if id != self.BROADCASTING_ID:
          dat, r = self.RxPacket(echo)
          if r:
            return dat[4] == id and (dat[8] & 0x7f) == 0
          else:
            return True
    return False

  @micropython.native
  def Write8(self, id : int, addr : int, data : int | tuple | list, echo = False):
    return self.Write(id, addr, B2Bs(data), echo)

  @micropython.native
  def Write16(self, id : int, addr : int, data : int | tuple | list, echo = False):
    return self.Write(id, addr, W2Bs(data), echo)

  @micropython.native
  def Write32(self, id : int, addr : int, data : int | tuple | list, echo = False):
    return self.Write(id, addr, L2Bs(data), echo)

  @micropython.native
  def Read(self, id : int, addr : int, length : int, echo = False) -> (bytes, bool):
    """
    Read instruction

    parameters
    -------------
    id : int
      Target ID
    addr : int
      Target item address
    length : int
      Number of bytes to read

    Returns
    -------
    bytes
      Data read
    bool
      Success or failure
    """
    if id >= 0 and id <= 252 and addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10):
      if self.TxPacket(id, self.INST_READ, W2Bs(addr) + W2Bs(length), echo)[1]:
        dat, r = self.RxPacket(echo)
        if r:
          return bytes(dat[9:-2])
    return None

  @micropython.native
  def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.Read(id, addr, length, echo)
    if r:
      n = unpack('<' + (('b'*length) if signed else ('B'*length)), r[0:1 * length])
      return n if length > 1 else n[0]
    return None

  @micropython.native
  def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.Read(id, addr, 2 * length, echo)
    if r != None:
      n = unpack('<' + (('h'*length) if signed else ('H'*length)), r[0:2 * length])
      return n if length > 1 else n[0]
    return None

  @micropython.native
  def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.Read(id, addr, 4 * length, echo)
    if r != None:
      n = unpack('<' + (('i'*length) if signed else ('I'*length)), r[0:4 * length])
      return n if length > 1 else n[0]
    return None

  @micropython.native
  def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool:
    """
    Sync Write instruction

    parameters
    -------------
    addr : int
      Target item address
    length : int
      Number of bytes to write
    id_datas : (TSyncW)
      Target ID and data

    Returns
    -------
    bool
      Success or failure
    """
    if addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10):
      param = W2Bs(addr) + W2Bs(length)
      for d in id_datas:
        param += bytes((d.id,)) + d.data
        if len(d.data) != length or d.id < 0 or d.id > 252:
          del param
          return False
      return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1]
    return False

  @micropython.native
  def SyncRead(self, addr : int, length : int, ids : (int), echo = False) -> tuple:
    """
    Sync Read instruction

    parameters
    -------------
    addr : int
      Target item address
    length : int
      Number of bytes to read
    ids : (int)
      Target IDs

    Returns
    -------
    tuple
      Read ID and data
    """
    result = ()
    if addr >= 0 and addr < 65535 and length > 0 and length < (65536 - 10):
      if self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_READ, W2Bs(addr) + W2Bs(length) + B2Bs(ids), echo)[1]:
        for id in ids:
          dat, r = self.RxPacket(echo)
          if r:
            if dat[4] == id:
              result += (id, bytes(dat[9:9 + length])),
            else:
              result += (id, bytes([])),
          else:
            result += (id, bytes([])),
    return result

  @micropython.native
  def BulkWrite(self, data : (TBulkW), echo = False) -> bool:
    """
    Bulk Write instruction

    parameters
    -------------
    addr : int
      Target item address
    length : int
      Number of bytes to write
    ids : (TSyncW)
      Target ID and data

    Returns
    -------
    bool
      Success or failure
    """
    param = bytes()
    for d in data:
      if d.id >= 0 and d.id <= 252 and d.addr >= 0 and d.addr <= 65535:
        param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(len(d.data)) + d.data
      else:
        del param
        return False
    return self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_WRITE, param, echo)[1]

  @micropython.native
  def BulkRead(self, data:(TBulkR), echo = False) -> tuple:
    """
    Bulk Read instruction

    parameters
    -------------
    data : (TBulkR)
      ID, address, and number of bytes to be read

    Returns
    -------
    tuple
      Read ID and data
    """
    result = ()
    param = bytes()
    for d in data:
      if d.id < 0 or d.addr < 0 or d.length < 0:
        return result
      param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(d.length)
    if self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_READ, param, echo)[1]:
      for d in data:
        rxd, r = self.RxPacket(echo)
        if r:
          if(d.id == rxd[4]):
            result += (d.id, bytes(rxd[9:-2])),
          else:
            result += (d.id, bytes([])),
        else:
          result += (d.id, bytes([])),
    del param
    return result

  @micropython.native
  def Ping(self, id : int, echo = False) -> bool:
    if self.TxPacket(id, self.INST_PING, bytes(), echo)[1]:
      rxd, r = self.RxPacket(echo)
      if r:
        return id == rxd[4] and rxd[5] == 7 and rxd[6] == 0
    return False

  @micropython.native
  def FactoryReset(self, id : int, p1 : int, echo = False) -> bool:
    if self.TxPacket(id, self.INST_FACTORY_RESET, B2Bs(p1), echo)[1]:
      rxd, r = self.RxPacket(echo)
      if r:
        return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0
    return False

  @micropython.native
  def Reboot(self, id : int, echo = False) -> bool:
    if self.TxPacket(id, self.INST_REBOOT, bytes(), echo)[1]:
      rxd, r = self.RxPacket(echo)
      if r:
        return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0
    return False

  @micropython.native
  def Clear(self, id : int, val, echo = False) -> bool:
    if self.TxPacket(id, self.INST_CLEAR, B2Bs(1) + L2Bs(val), echo)[1]:
      rxd, r = self.RxPacket(echo)
      if r:
        return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0
    return False

##########################################################
# test code
##########################################################
if __name__ == "__main__":
  import uasyncio as asyncio
  from ucontextlib import contextmanager
  import gc

  try:
    led = Pin('LED', Pin.OUT)
  except TypeError:
    led = Pin(25, Pin.OUT)
  led.off()

  dx = None
  ec = False
  ID = 1
  fin = False

  async def test1():
    global dx, ec, ID, fin
    @contextmanager
    def stopwatch():
      led.toggle()
      start_time = time.ticks_us()
      yield
      #print('..proc time={:.2f}ms {}'.format((time.ticks_us() - start_time) / 1000, gc.mem_alloc()))

    # Basic operation check
    try:
      '''
      with stopwatch():
        r = dx.FactoryReset(ID, echo=ec)
      print(f'Reset({ID}): {r}')
      dx.baudrate = 57600
      ID = 1
      await asyncio.sleep(0.5)
      '''

      """ ping dxl """
      for i in range(10):
        with stopwatch():
          print(f'Ping({i}):', dx.Ping(i, echo=ec))

      """ reboot dxl """
      with stopwatch():
        r = dx.Reboot(ID, echo=ec)
      print(f'Reboot({ID})= {r}')
      await asyncio.sleep(0.5)

      """ basic packet proc """
      with stopwatch():
        r0 = dx.TxPacket(ID, dx.INST_WRITE, (65, 0, 1))
        r1 = dx.RxPacket()
      print('TxPacket({})= {} {}'.format(ID, r0[1], r0[0].hex(':') if r0[0] else '--'))
      print('RxPacket({})= {} {}'.format(ID, r1[1], r1[0].hex(':') if r1[0] else '--'))

      """ dump memory """
      l = 50
      for addr in range(0, 700, l):
        r = dx.Read(ID, addr, l, echo = ec)
        print(f'Read({addr};{l})=',r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!')

      """ read byte item """
      with stopwatch():
        r = dx.Read8(ID, 65, echo=ec)
      print(f'Read8({ID},65)= {r}')
      """ read word item """
      with stopwatch():
        r = dx.Read16(ID, 120, echo=ec)
      print(f'Read16({ID},120)= {r}')
      """ read long item """
      with stopwatch():
        r = dx.Read32(ID, 132, echo=ec)
      print(f'Read32({ID},132)= {r}')

      """ sync read inst. """
      with stopwatch():
        d = dx.SyncRead(0, 4, (1, 2, 3, 4, 5, 6, 7, 8), echo=ec)
      print('SyncRead=')
      for d in d:
        print(f' ({d[0]},0)', d[1].hex(':') if len(d[1]) > 0 else ())

      """ sync write inst. """
      for i in range(20):
        with stopwatch():
          dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(1)), dx.TSyncW(2, B2Bs(1)), dx.TSyncW(3, B2Bs(1))), echo=ec)
        await asyncio.sleep(0.05)
        with stopwatch():
          dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(0)), dx.TSyncW(2, B2Bs(0)), dx.TSyncW(3, B2Bs(0))), echo=ec)
        await asyncio.sleep(0.05)

      """ set goal position """
      #torque off
      with stopwatch():
        dx.Write8(ID, 64, 0, echo=ec)
      #multi turn
      with stopwatch():
        dx.Write8(ID, 11, 4, echo=ec)
      #torque on
      with stopwatch():
        dx.Write8(ID, 64, 1, echo=ec)
      for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)):
        #set goal potition
        with stopwatch():
          dx.Write32(ID, 116, gp, echo=ec)
        #get present potition
        with stopwatch():
          pp = dx.Read32(ID, 132, signed = True, echo=ec)
        if pp:
          print(f'Goal Pos={gp:6d}, Present Pos={pp:6d}', end='\r')
        else:
          break
        await asyncio.sleep(0.05)
      else:
        print('')
      #torque off
      with stopwatch():
        dx.Write8(ID, 64, 0, echo=ec)
      #normal turn
      with stopwatch():
        dx.Write8(ID, 11, 3, echo=ec)

      """ test add/remove suffixes """
      fffffd = bytes((
        0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
        0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
        0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
      ))
      with stopwatch():
        r = dx.Write(ID, 634, fffffd, echo=ec)
      print(f'Write({ID},634)= {r}')
      with stopwatch():
        r = dx.Read(ID, 634, len(fffffd), echo=ec)
      print(f'Read({ID},634)=', r.hex(':') if r else '--')

      """ bulk read inst. """
      with stopwatch():
        d = dx.BulkRead((dx.TBulkR(1, 0, 10), dx.TBulkR(2, 0, 20), dx.TBulkR(3, 0, 30), dx.TBulkR(4, 0, 40), dx.TBulkR(6, 0, 50)), echo=ec)
      print('BulkRead=')
      for d in d:
        print(f' ({d[0]},0)', d[1].hex(':'))

      """ bulk write inst. """
      dx.Write8(ID, 64, 1, echo=ec)
      with stopwatch():
        dx.BulkWrite((dx.TBulkW(1, 104, L2Bs((0,0,0,1024))), dx.TBulkW(2, 65, B2Bs(0)), dx.TBulkW(3, 65, B2Bs(0))), echo=ec)
      await asyncio.sleep(0.5)
      with stopwatch():
        dx.BulkWrite((dx.TBulkW(1, 104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(2048)), dx.TBulkW(2, 65, B2Bs(1)), dx.TBulkW(3, 65, B2Bs(1))), echo=ec)
      await asyncio.sleep(0.5)
      with stopwatch():
        dx.BulkWrite((dx.TBulkW(1, 104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(1024)), dx.TBulkW(2, 65, B2Bs(0)), dx.TBulkW(3, 65, B2Bs(0))), echo=ec)
      await asyncio.sleep(0.5)
      with stopwatch():
        dx.BulkWrite((dx.TBulkW(1, 116, L2Bs(2048)), dx.TBulkW(2,65, B2Bs(1)), dx.TBulkW(3, 65, B2Bs(1))), echo=ec)
      await asyncio.sleep(0.5)
      with stopwatch():
        dx.BulkWrite((dx.TBulkW(1, 65, B2Bs(0)), dx.TBulkW(2, 65, B2Bs(0)), dx.TBulkW(3, 65, B2Bs(0))), echo=ec)
      dx.Write8(ID, 64, 0, echo=ec)

    except Exception as ex:
      print('--- Caught Exception ---')
      import sys
      sys.print_exception(ex)
      print('------------------------')

    fin = True

  async def test2():
    global dx, ec, ID, fin
    await asyncio.sleep(0.5)
    while not fin:
      try:
        dx.Write8(ID, 65, dx.Read8(ID, 65, echo=ec) ^ 1, echo=ec)
        await asyncio.sleep(0.02)
      except Exception as ex:
        print('--- Caught Exception ---')
        import sys
        sys.print_exception(ex)
        print('------------------------')

  def test3():
    global dx, ec, ID, fin

    # Basic operation check
    print('reset=', dx.Reboot(1, echo=ec))
    time.sleep(2)

    print('ping=', dx.Ping(1, echo=ec))

    dx.TxPacket(1, dx.INST_READ, (0x2b, 0x01), True)
    dx.RxPacket(True)

    r = dx.Read(1, 0, 70, echo=ec)
    print(r.hex(','))
    print('len=',len(r))

    for i in range(15):
      led = dx.Read8(1, 25, echo=ec) ^ 1
      dx.Write8(1, 25, led, echo=ec)
      print(f'led={led}', end = '\r')
      time.sleep(0.2)
    print()

    dx.Write8(1, 24, 0, echo=ec)
    dx.Write16(1, 6, (4095, 4095), echo=ec)

    for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)):
      dx.Write16(1, 30, gp, echo=ec)
      pp = dx.Read16(1, 36, signed=True, echo=ec)
      print(f'Goal Pos={gp:6d}, Present Pos={pp:6d}', end='\r')
      time.sleep(0.05)
    else:
      print('')

    dx.Write8(1, 24, 0, echo=ec)
    dx.Write16(1, 6, (0, 4095),echo=ec)

  machine.freq(250000000) # set the CPU frequency to 250 MHz

  dx = DXLProtocolV2(baudrate = 3000000)
  loop = asyncio.get_event_loop()
  loop.create_task(test1())
  loop.create_task(test2())
  loop.run_forever()

  time.sleep(1)
  dx = DXLProtocolV1(baudrate = 57600)
  test3()

Front page   Diff ReloadPrint View   Page list Search Recent changes   RSS of recent changes (RSS 1.0) RSS of recent changes (RSS 2.0) RSS of recent changes (RSS Atom)
Last-modified: 2024-01-09 (Tue) 23:24:03 (JST) (277d)