様々なアプリケーションにおいてDYNAMIXELでつまづかないよう、最低限運転できるところまでの流れを本ページにまとめた。PCの操作やOSについても説明をしなくてはならない事も多々あるが、ここでは「ひとまず」動かせるところまでを一気に紹介する事で、分かった気になってもらう事に主眼を置いた。
そのような中においても多少なりともTipをちりばめてあるので、最終的にはそれらを踏まえたシステムを構築して欲しい。
縦に長くなって閲覧しにくいかもしれないが予めご了承の程。また別ページへのリンクをクリックするとブラウザ上ではページが切り替わるが、何度も往来しているうちに元に戻るのが億劫になる事がある。リンク先を参照する際は「Ctrl」キーを押しながらクリックして新規タブで開けば、多少なりとも閲覧効率が良くなるかも知れない。
※ここに記述された内容は断り無く更新される。
DYNAMIXELは主に2種類のI/Fが規定され、3ピンコネクタ版を俗にTTL版、4ピンコネクタ版をRS-485版と呼んでおり、電気的には相互互換性は無い。ほとんどのDYNAMIXELにはいずれかのタイプのコネクタが2個装備されている。
まずこの2種類のI/Fのコネクタを紹介する。
端子番号 | 信号名 | |
1 | GND | 電源のマイナス側 シリアル通信のGNDと共有 |
2 | VDD | 電源のプラス側 大半のDYNAMIXELは12V |
3 | TTL Signal | シリアル通信の信号 双方向 |
端子番号 | 信号名 | |
1 | GND | 電源のマイナス側 シリアル通信のGNDと共有 |
2 | VDD | 電源のプラス側 大半のDYNAMIXELはDC12V |
3 | RS-485 D+ | シリアル通信のプラス側差動信号 双方向 |
4 | RS-485 D- | シリアル通信のマイナス側差動信号 双方向 |
紹介したTTL及びRS-485の2種類のI/Fは一般的なPCには標準装備されていないため、USBポートを介して増設するための装置が用意されている。複数のI/Fに対応していたり、いずれか1つのI/Fにのみ対応していたり、電源の供給を行えたりとラインナップが複数あるため、以下に現行品のみの比較表を示す。
USB2RS485 dongle | USB2TTL dongle | USB2DXIF dongle | USB2DXIF | Starter Kit A | Starter Kit B | PicoSHIELD | DXHUB | U2D2 | |
Manufacture | BestTechnology | ROBOTIS | |||||||
USB | USB2.0 FS USB Type-A | USB2.0 FS USB Type-C | USB2.0 FS USB micro-B | USB2.0 FS USB Type-C | USB2.0 FS micro USB Type-B | USB2.0 HS USB Type-C | USB2.0 HS micro USB Type-B | ||
FTDI Chip | FT234X | FT231XS | FT234X | RP2040 | FT232H | ||||
Max DTR Rate [Mbps] | 3 | 3 | 12 | 6 | |||||
DYNAMIXEL I/F | RS-485 x1 | TTL x1 | RS-485 x1, TTL x1 | RS-485 x1, TTL x1 | RS-485 x5, TTL x5 | RS-485 x4, TTL x4 | RS-485 x1 | RS-485 x6, TTL x6 | RS-485 x1, TTL x2 |
Isolation | Yes | No | Yes | No | |||||
Dimension [mm] | 43.2x13.0x8.5 | 18.0x12.7x9 | 40x40 | 51x23.5 | 52x33x15 | 48x18x14.6 | |||
Other spec. | Small | Tiny | Power distribution | Plastic enclosure | |||||
Software compatibility | DYNAMIXEL Wizard 2.0, R+2.0, DXLIB, DX2LIB, DYNAMIXEL SDK |
ここで紹介していない古いI/F製品は概ね旧来のDYNAMIXELを前提としており、装備されているコネクタがmolex社製のものが主である。もちろんConvertible Cableを仲介させる事で現行モデルのDYNAMIXELにも接続できるので、持っているのであれば買い直す必要は無い。
現時点ではAX・MX・X・Pシリーズから選定する事になるが、シリーズごとの大雑把な特徴を紹介しておく。
MXとXシリーズのストールトルクと無負荷最大回転数をモデルごとにプロットしたグラフを紹介しておく。
アプリケーションに要求されるトルクで選定するのが一般的なのだが、DYNAMIXELの大半のモデルは定格トルクではなくストールトルクを謳っている。ストールトルクは物理的にこの値を超えて出力されることは無い値(瞬時最大トルクに近い)であり、連続的に使って良いという意味では無い。
なおここではあえてXシリーズ以降を前提とするので、AXやMXシリーズまたは廃盤品を使用する場合は差異を各自で深掘りした上で適宜読み替えてもらうものとする。また廃盤品を含む全ラインナップの一覧はこちらに掲載した。
各USB I/Fを使用した場合の実態配線を紹介する。
いずれも複数のDYNAMIXELのコネクタをお互いに並列接続し、更にそこへUSB I/Fにて増設されるTTLないしRS-485 I/Fの信号と外部電源を接続する事が主な目的である。
なおmolex社のコネクタを装備したDYNAMIXELを使用する場合はJST社とmoelx社のコネクタを変換するConvertible Cableが必要だが、ここではXシリーズを前提で書き進めるので詳説しない。
3ピンのTTLと4ピンのRS-485 I/Fのコネクタが複数備わっている。更にDYNAMIXELへ電源を分配供給する機能があり、その電源の入力用コネクタが装備されている。各装置をDXHUBへ接続すると基本的に接続が完結してしまう。
同一電源で構わないのであればTTLとRS-485 I/Fを装備したDYNAMIXELを同時に運用できる。
4ピンのRS-485 I/Fのコネクタが1つのみ装備されており、DYNAMIXELへの電源の供給機能は無い。別途DXSharingBoard(RS485)等の分配基板を用意する事でDYNAMIXELへの電源の供給と複数台のDYNAMIXELの接続ができる。
この例ではPCとDYNAMIXEL間の距離はRobot Cable X4Pに支配されるが、USBドングルとPC間の接続を市販のUSB延長ケーブル等で中継する事で延長する事ができる。
更に正しく処置したRS-485ケーブルを作成する事で、USBドングルとDYNAMIXEL間の距離を延長できる。
3ピンのTTL I/Fのコネクタが1つのみ装備されており、DYNAMIXELへの電源の供給機能は無い。別途DXSharingBoard(TTL)等の分配基板を用意する事でDYNAMIXELへの電源の供給と複数台のDYNAMIXELの接続ができる。
この例ではPCとDYNAMIXEL間の距離はRobot Cable X3Pに支配されるが、USBドングルとPC間の接続を市販のUSB延長ケーブル等で中継する事で延長する事ができる。
USB2RS485 dongleとUSB2TTL dongleの機能を合わせ持っており、3ピンのTTL I/Fと4ピンのRS-485 I/Fのコネクタがそれぞれ1つずつ装備されている。
そもそもUSB2RS485 dongleとUSB2TTL dongleとUSB2DXIF dongleはコネクタの実装状況が異なる以外は全く同じ代物であるので、USB2RS485 dongleないしUSB2TTL dongleを参考に接続が行える。
4ピンのRS-485 I/Fのコネクタが1つ、3ピンのTTL I/Fのコネクタが2つ装備されており、DYNAMIXELへの電源の供給機能は無い。別途U2D2 PHBやDXSharingBoard等の分配基板を用意する事でDYNAMIXELへの電源の供給と複数台のDYNAMIXELの接続ができる。
TTLとRS-485 I/Fを同時に運用できる。
なおU2D2 PHBの電源供給用のジャックはROBOTIS社のキットに同梱されるSMPSのサイズに合わせてあり、日本国内で多く流通するものには適合しない。
TTL I/Fのシリアル信号は極めて微弱かつ脆弱で、距離を延長する事ができないまでか、DYNAMIXELの電気回路を容易に故障させる可能性すらある。弊社ではTTL I/Fの選択肢しかないモデルを除き、堅牢なシステムにおいてはRS-485 I/Fを搭載したモデルを強く推奨する(モデル名のサフィックスに「-R」と記載)。
また単体製品及びDYNAMIXELに同梱されているRobot Cableはケーブルを製作せずともすぐに接続できる事を念頭に用意されており、一般的に考え得る電気的な問題を解消できるものでは無い。
特にRS-485 I/Fの実力を発揮させるためには既成のRobot Cableは適さないため、電源容量と延長距離に対応したケーブルの製作を推奨する。
RS-485の終端抵抗の処置も行っておきたいところだ。
先の図では電源装置をSMPS等と表記しているが、任意電圧に設定できる電源装置やバッテリを負荷が要求する電圧と電流に合わせて用意する。しかしDYNAMIXELに内蔵されたモータの回生電力によって電圧が上昇してしまう事までは注意が行き届かないと思う。特に固定出力の一般的なスイッチングACアダプタには出力電圧と電流は規定されているが、出力側の耐圧やシンク能力までを明記しているものはまず無い。少なくとも誘導性負荷に対応しているものを選定しないと容易に故障する。
また電源からの距離をかせぐための延長ケーブルは、平行線ではなくツイストペアケーブル(より対線)を使うべきだ。
回路上PC・USBポート・USB I/F・DYNAMIXELの全てのGNDが共通になっている必要があるが、DYNAMIXEL側で電流サージが発生したりPCとDYNAMIXELに供給されている電源自体に大きな電位差があったりすると、GNDの電位が共通である事による問題が生じる場合がある。
大抵のPCのUSBポートには過電流保護機能が備わっているので、コネクタの抜き差し時に発生する電流サージ程度には耐性がある。しかしモータから生じるサージとなると、保護回路では太刀打ちできないレベルのものが発生する事があり得る。またUSBポートの過電流保護機能が働くとUSBポートが一時的に機能を失い、それを想定していないプログラムやOSであれば容易に暴走する。
更に各装置に供給される電源の電位差はサージ過電流以上に厄介で、瞬間的なものではなく定常的に過電流状態が維持されてしまい、あらゆる装置の永久破壊につながる。
いずれの問題にも幾度となく遭遇し、壊したPCやI/Fが何台ある事か。。。
そういった諸問題を解消する目的で、USB2RS485 dongle・USB2TTL dongle・DXHUBはいずれもUSBポートとDYNAMIXEL用I/F及び電源の間を電気的に絶縁している。
なお以後の解説では接続の容易さを買って絶縁機能を持ったDXHUBを使用する。
DYNAMIXELに関係するID番号・通信速度・モデル番号・コントロールテーブル・ファームウェアといったキーワードがあるが、大半はDYNAMIXELの外観から判別できない内部情報であり、後述するPC用のツールを用いる事で初めて認識できる。
電源・電源ケーブル・USBケーブルは各製品に標準添付されていないため、必要に応じて別途調達する。
接続は先に紹介したDXHUBの場合の通りとなるが、初期設定の場合は混乱を避けるため1台のみのDYNAMIXELを接続する。
DXHUBをPCのUSBポートに接続してデバイスドライバのインストールを行った後に、こちらの情報を元に仮想COMポートの設定を予め変更しておく事を推奨する。
またDXHUBの裏側が不用意に何かに触れてショートすると取り返しの付かない事になるため、予め四隅の取り付け穴にスペーサ等を取り付けて端子等がショートしないようにしておくと良いだろう。
DYNAMIXEL Wizard 2.0はDYNAMIXELとコミュニケーションを行うソフトウェアで、主に以下の機能を有する。
DYNAMIXELの専用通信プロトコルを用いて対象のDYNAMIXELのコントロールテーブルにアクセスしているに過ぎないが、このようなツールを使用しない場合はちょっとした設定変更をするにしても自前でプログラムを作らざるを得ない。ROBOTIS社のサイトからダウンロードしてインストールしておく。
なおDYNAMIXEL Wizard 2.0はインターネットを介してツール本体とファームウェアの更新版を取得する事ができる。ここでは詳説しないが、インターネットに接続された環境でDYNAMIXEL Wizard 2.0を起動した直後に自動的に更新がないかを確認し、必要があれば更新するか否かを問い合わせてくる。Install Nowボタンを押すと更新が開始されるが、特に適用する必要が無いと判断したらInstall laterボタンを押す事で更新はキャンセルされる。
インストールしたDYNAMIXEL Wizard 2.0を起動したらツールバーのOptionsボタンをクリックし、開いたOptionsダイアログボックスの左側のScanをクリックする。
以下の項目を設定してOKボタンを押すと、DYNAMIXEL Wizard 2.0の検索条件の期設定はひとまず完了である。次回起動した際も本設定が反映される。
なお本設定は出荷時のDYNAMIXEL Xシリーズを対象としているため、後々のDYNAMIXELの設定等によっては以下のように適宜変更する。
DXHUBの電源スイッチをONにして電源の供給を開始すると青色のLEDが点灯し、DYNAMIXELに装備されたLEDが0.5秒だけ点灯する。その後DYNAMIXEL Wizard 2.0のツールバーのScanボタンをクリックすると、検索条件の設定に従って検索が開始される。検索中のダイアログボックスにはプログレスバーが表示され、見つかったDYNAMIXELはダイアログボックスとメイン画面の画面左側のCOMポート番号と通信速度のツリーの配下に逐次列挙される。
Select baudrate to scan.の全ての通信速度がチェックされていると検索の終了までにかなりの時間がかかるが、検索中に目的のDYNAMIXELが見つかったり途中で中断する場合はSkipボタンを押す。
DYNAMIXEL Wizard 2.0は検索によって見つかったDYNAMIXELのみを操作対象とするため、検索は起動直後に毎回強いられる。検索条件の設定のSelect baudrate to scan.のチェックを減らしたり、Set ID range to scan.のStat及びEndのID番号を適宜変更する事で検索時間を短縮できる。
もし検索で見つからない場合は、検索条件の設定が対象のDYNAMIXELに合致しない・COMポートが間違っている・電源が入っていない・いずれかの装置の故障等の理由が考えられるが、原因を特定するのに少々難儀する場合もある。
出荷時状態のDYNAMIXELの動作モードは大抵「位置決め制御」モードとなっており、任意の位置を指示する事でDYNAMIXELのサーボホーンの位置がそれに伴って移動する。DYNAMIXEL Wizard 2.0では以下の手順と操作で位置を指示できる。
この手順で思ったようにDYNAMIXELのホーンが動かない場合は、DYNAMIXELが故障している可能性が高い。
複数のDYNAMIXELを同時に運用する場合は、個々に異なるID番号を設定しておかなくてはならない。通信プロトコルの仕様では複数のDYNAMIXELのID番号が同じだと個々を識別できないからである。
DYNAMIXELのデフォルトの通信速度は57600bpsだが、軸数が増えたりタイトな通信をし始めるとトラフィックが渋滞し、体感的にも応答が遅いと感じたりする。高い通信速度は様々な環境に影響されやすいため最高速まで設定する必要は無いが、1000000bps程度であれば経験的に安定して使用できる速度と考える。以下に通信速度を変更する手順を示す。
Value | 通信速度[bps] |
0 | 9,600 |
1 | 57,600 |
2 | 115,200 |
3 | 1,000,000 |
4 | 2,000,000 |
5 | 3,000,000 |
6 | 4,000,000 |
7 | 4,500,000 |
プログラミングの知識が無ければ何も始まらないし、それ相応の環境を整える必要もある。
ここでは弊社が提供するWindows向けの簡易的な統合環境と専用のライブラリを用い、C言語でコーディングする。コンパイル後に生成されるプログラムは、いずれもCUIベースである。
DYNAMIXELは専用の通信プロトコルでのみ通信できるが、通信プロトコルを処理するプログラムを自前でコーディングするのにはそれ相応の知識と労力が要求される。その手間を軽減させる目的で提供しているのがDynamixel Protocol 2 Library(DX2LIB)である。
他にもこういった目的のライブラリはあるが、DYNAMIXELの主要な機能を利用する際にモデルやコントロールテーブルの差異をほぼ意識しなくて済む追加APIをDX2LIBは持っている。追加APIを使用している限りユーザーはCOMポート番号・通信速度・ID番号程度を把握していれば良く、最終的には少ないソースコード量で大抵の処理が行えると謳っている。また各APIはコンパイル済みのDLLに含まれているので、自分のプログラムから呼び出す手段さえあれば言語は問わない。
見ての通り追加APIのソースでは大した事はしておらず、モデルによって異なるアイテムの変換や手順を強いられるアイテムの手順操作、物理値とアイテム間の単位変換、複数IDの一括処理に終始しているだけである。しかし追加APIを使わない場合は、コントロールテーブルのアイテムの機能を習熟する段取りから始めざるを得ないので、説明を端折るにはありがたい 。
なおプロトコルV1.0用のライブラリも用意されているので、どうしてもDX/AX/RX/EX/MXシリーズを動かしたいという場合は所々読み替えて対応してもらいたい。
また我々としてはプロトコルV1.0とV2.0を混在させた運用は危険と判断しているのでライブラリはプロトコルのバージョン別に用意しているが、DYNAMIXEL SDKでは同時に運用できる事になっている。
こちらより最新のアーカイブファイルをダウンロードし、適宜解凍する。
以後に紹介するGCC Developer Liteでコーディング・コンパイル・実行には、ライブラリに含まれる次の4つのファイルがソースコードファイルを保存したフォルダと同じ場所に配置されている事が前提となる。
ファイル名 | 説明 |
dx2lib.h | 各APIのプロトタイプ宣言等を含むヘッダファイル |
dx2memmap.h | コントロールテーブル上の主要アイテムのアドレスが定義されたヘッダファイル |
dx2lib_x32.dll | ライブラリ本体が含まれた32ビット版DLL |
dx2lib_x64.dll | ライブラリ本体が含まれた64ビット版DLL |
このうち「dx2memmap.h」は一部のDYNAMIXELのコントロールテーブルに配置されたアイテムのアドレスを記述しているファイルなのだが、アイテムのアドレスを指定してアクセスしない限り必須ではない。また32ビット版と64ビット版のDLLはコンパイル時の条件によっていずれか一方で構わない。その他のファイルは一切必要ない。
GCC Developer Liteはマイコンボード製品向けにプログラムを手っ取り早く動かせるツールとして提供しているが、ここではWindows上で動作するプログラムをコーディング及びコンパイルする事が目的となる。
最近の統合環境の類いに比べたら極めて少ない機能しか備えておらず正直なところ今更な感じではあるのだが、無償かつ簡単な設定でソースコードの編集とコンパイルができるという理由だけで選定した。
ここではWindows向けの実行プログラムを生成する事が目的なので、ここから「基本パック」と「WIN64パック」をダウンロードする。手順を間違えるとインストールできないので、こちらを読んでから作業する。
後述のソースコードを使用する際に必要な操作を紹介する。
まずブラウザから該当のソースコードをクリップボードにコピーし、GCC Developer Liteのエディタウィンドウ上にペーストする。ソースコード中のCOMポート番号の部分("\\\\.\\COM3")の数値を自身の環境に合わせて書き換えたら編集作業は完了である。
編集中のソースコードをファイルとして保存しておけば、後々再度読み出して再利用できる。保存するには「ファイル(F)」メニュー内の「名前を付けて保存(A)...」をクリックし任意の場所と名前で保存する。なおファイル名のフルパスに全角文字・機種依存文字・空白が含まれているとコンパイルに支障をきたすので、基本的には半角の英数字で構成しておく事。
Windowsの64bit版を想定したのでWIN64パックをインストールする事としたが、32bitの場合はWIN32パックを選定するまでである。もし32bit版を適用した場合は64bitとある所を32bit、x64とある所をx86に読み替える事。
コンパイルしたいソースコードを開いている状態で、「ツール(T)」メニュー内の「コンパイルオプション(O)」をクリックしコンパイルオプションダイアログボックを開く。上端の設定リストをドロップダウンし「Windows x64 (Console)」を選んでOKボタンを押すと、32ビット版のコンパイル条件の設定は全て完了する。
なお後述のビルドやデバッグ情報付きビルドを行った既存のソースコードファイルを開き直した場合、「保存済みの環境設定ファイルが見つかりました。」というメッセージダイアログボックスが表示される。
このメッセージは過去にコンパイルオプションを設定してコンパイルされた事を意味し、OKボタンを押す事でコンパイラオプションを設定したのと同等の扱いとなる。
後述のライブラリを利用したソースコードをコンパイルするには、ライブラリに含まれる4つのファイルが必要となる。それらファイルを予めソースコードを保存したフォルダにコピーしておく事を忘れないように。
コンパイラオプションが正常に設定されていれば、「コンパイル(C)」メニュー内の「ビルド(B)」が利用できるようになる。「ビルド(B)」をクリックする(もしくは「F9」キーを押す)とコンパイラが呼び出され、問題が無ければソースコードから実行可能なプログラムが同じフォルダの中に生成される。
例えばソースコードのファイル名が「test1.c」ならば、「test1.exe」というファイル名で実行ファイルが生成される。実行ファイルが生成されたら、そのファイルをエクスプローラー等から開いて実行する。
実行中はコマンドプロンプトが表示され、標準出力へ何かしらのメッセージが出力される。
なおエクスプローラーから起動したプログラムは、メッセージを確認する間もなく終了と同時にコマンドプロンプトが閉じてしまう。メッセージを確認する必要がある場合は予めコマンドプロンプト(cmd.exe)を起動しておき、その中でコンパイルしたプログラムを実行すれば終了時に勝手に閉じることはない。
ビルドと実行と大差は無いが、コンパイル後にデバッガを介してプログラムを実行する。ここではデバッガを使ってデバッグする事が目的というよりも、GCC Developer Liteのみでコンパイルからプログラムの実行まで全て行うためだけに使う。
コンパイラオプションが正常に設定されていれば、「コンパイル(C)」メニュー内の「デバッグビルド&デバッガで開く(G)」が利用できるようになる。「デバッグビルド&デバッガで開く(G)」をクリックする(もしくは「Ctrl」+「F9」キーを押す)とコンパイラが呼び出され、問題が無ければソースコードから実行可能なプログラムが同じフォルダの中に生成される。
コンパイルが成功すると「デバッガ(GDB)を起動しますか?」と問い合わせてくるので、OKを押すとデバッガ(GDB)が開く。
GDBの操作説明は省くが、そのプロンプト上で「r」と入力してエンターキーを押すとコンパイルしたプログラムが実行される。
なおデバッガに興味を持ったのであれば、その使い方を検索してみると良い。
ソースコードの編集以外には大した機能は無いが、その中でも余り知られていない機能を紹介する。
紹介するソースコード上ではID番号が1~4に設定されたDYNAMIXELがあり、信速度は1Mbpsに変更してあるものとした。自身の環境向けにはPC上のDXHUBに割り当てられたCOMポートの番号を修正する程度で済む筈だ。ID番号が4つよりも少なくても多くても動かす事はできるが、少なくともID番号が1と2に設定されたDYNAMIXELが無いと一部の機能の確認ができない。
またDYNAMIXELのコントロールテーブルには機能を大きく左右するOperating Modeなるアイテムがあり、出荷時は3(Position Control Mode)に設定されている。紹介するソースコードによってはそのモードを変更しており、DYNAMIXELの電源を投入し直してもモードはそのまま保持される。他の目的でDYNAMIXELを使用する際はOperating Modeが変更されている可能性がある事を常に失念しないよう注意する事。
さらに初期条件を整える時を除いてエラー処理の類いを行っておらず、何かしらの不都合があったとしてもプログラムの実行は停止しないまでか異常終了する事も考えられる。DX2LIBのAPIの大半は戻り値で実行の正否の判定ができるので、堅牢なアプリケションを構成する際は判断材料にすると良いだろう。
少なくともソースコードをコンパイルするに当たっての前提は、
である。
なおGCC Developer LiteはGNUツールを用いてコンパイルしているため、MSYS2の環境でもコンパイルできる。またCOMポート名や時間にかかる関数を修正すれば、他のOSでも容易に流用できる。
以後のソースコードには、APIを使用するのに共通する最低限必要な処理が含まれている。
最終的には検索とCOMポートのクローズの間に実際に行いたい処理を追記する事になる。
DXL_ScanDevicesによる検索はDYNAMIXELのモデルが想定と違っていてもある程度順応できるようにと考えての事だが、諸設定が最適に設定されていたとしても検索が終わるまではそれ相応の時間(約10秒程度)かかる。
// DYNAMIXELの検索とモデル名の列挙 #include <stdio.h> #define _DYNAMICLOAD // DX2LIBの動的呼び出しを宣言 #include "dx2lib.h" // DX2LIBのヘッダ int main (void) { // DLLの読み込み(成功は必須) if (LoadDLL ()) { uint8_t IDs[253]; // 見つかったIDのリスト int detectnum = 0; // 見つかった数 // COMポートのオープン TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000); // devが0でなければオープン成功 if (dev) { // 検索 detectnum = DXL_ScanDevices (dev, IDs); // 見つかったモデルをコンソールに列挙 DXL_PrintDevicesList ((void *)&printf); // この辺りに目的の処理を追記 // COMポートのクローズ DX2_ClosePort (dev); } // DLLの破棄 UnloadDLL (); } }
正常に検索が終了すると左からID番号・モデル名・モデル番号の順に次のような情報が表示される。
[ 1] 2XL430-W250 ($0442) [ 2] 2XL430-W250 ($0442) [ 3] 2XL430-W250 ($0442) [ 4] 2XL430-W250 ($0442)
なおID番号が1~4のDYNAMIXELが存在する前提でのDXL_GetModelInfo APIを使った登録方法も一応紹介しておく。こちらは指定されたID番号のみを対象に登録できるかを判断するので、全ID番号を対象とした検索を行った場合と異なりほぼ一瞬で処理が終了する。
// ID番号を決め打ちする場合 #include <stdio.h> #define _DYNAMICLOAD #include "dx2lib.h" int main (void) { // DLLの読み込み(成功は必須) if (LoadDLL ()) { // COMポートのオープン TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000); // devが0でなければオープン成功 if (dev) { // 検索の代わりにID番号を直接指定して登録 DXL_GetModelInfo (dev, 1); DXL_GetModelInfo (dev, 2); DXL_GetModelInfo (dev, 3); DXL_GetModelInfo (dev, 4); // 登録されたモデルをコンソールに列挙 DXL_PrintDevicesList ((void *)&printf); // この辺りに目的の処理を追記 // COMポートのクローズ DX2_ClosePort (dev); } // DLLの破棄 UnloadDLL (); } }
サイレントに動作確認を行うには、DYNAMIXELに搭載されているLEDを使うと良いだろう。
ID番号を指定してLEDを明滅させるDXL_SetLED APIを用いて、個々のID番号を順に指定してLEDをON/OFFさせる。なおAPIの処理そのものは一瞬で終わってしまうので、目に見えるように10回のループ内のLEDのONとOFFの後にSleep命令で500msの中断を挿入した。
// LEDのON/OFF #include <stdio.h> #define _DYNAMICLOAD #include "dx2lib.h" int main (void) { if (LoadDLL ()) { uint8_t IDs[253]; // 見つかったIDのリスト int detectnum = 0; // 見つかった数 TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000); if (dev) { detectnum = DXL_ScanDevices (dev, IDs); DXL_PrintDevicesList ((void *)&printf); // とりあえず10回繰り返す for (int i = 0; i < 10; i++) { // 全軸のLEDをONに設定 printf ("ON "); for (int j = 0; j < detectnum; j++) DXL_SetLED (dev, IDs[j], true); Sleep (500); // 全軸のLEDをOFFに設定 printf ("OFF "); for (int j = 0; j < detectnum; j++) DXL_SetLED (dev, IDs[j], false); Sleep (500); } DX2_ClosePort (dev); } UnloadDLL (); } }
なおDYNAMIXELに何らかのアラームが発生した事によりLEDが自動点滅している間は、LEDの制御ができない。
試運転の際にも行っている通りトルクのONという操作を予め行っておかないと、位置や速度を指令してもホーンは動かない。コントロールテーブル上のアイテム名が「Torque Enable」となっているためにトルクと称しているが、実際には設定された制御モードに従って制御を開始・停止する事を意味している。
DXL_SetTorqueEnablesEquival APIを用いて引数で指定されたID番号の一覧を用いて一斉にトルクのON/OFFを切り替える。
// トルクのON/OFF #include <stdio.h> #define _DYNAMICLOAD #include "dx2lib.h" int main (void) { if (LoadDLL ()) { uint8_t IDs[253]; // 見つかったIDのリスト int detectnum = 0; // 見つかった数 TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000); if (dev) { detectnum = DXL_ScanDevices (dev, IDs); DXL_PrintDevicesList ((void *)&printf); // とりあえず意味も無く5回繰り返す for (int i = 0; i < 5; i++) { // 全軸のトルクをONに設定 printf ("ON "); DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, true); Sleep (1000); // 全軸のトルクをOFFに設定 printf ("OFF "); DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, false); Sleep (1000); } DX2_ClosePort (dev); } UnloadDLL (); } }
トルクがOFFの時にDYNAMIXELは脱力し、外力でホーンを動かす事ができる。
またコントロールテーブル中のアイテムの一部にはトルクがONの状態では変更できないものもあるため、APIによっては必要に応じてトルクのOFFを発行しているものがある。
DYNAMIXELの本分である位置決め制御を行う。
DXL_SetOperatingModesEquival APIを使用して全DYNAMIXELの動作モードを3:Position Control Modeに設定(このAPIは内部でトルクOFFを発行する)。全軸のトルクをON。コメントにあるように1秒かけて全軸の角度を45度へ移動→2秒かけて全軸の角度を-45度へ移動→1秒かけて全軸の角度を0度へ移動。最後に全軸のトルクをOFFにして終了する。foo関数内のDXL_SetGoalAnglesAndTime APIは軸数分の角度の配列と現在の角度からの移動時間を秒で指定するAPIだが、DYNAMIXELへの指令を終えると移動時間を待たずに即時復帰する。そのため直後にSleep関数で移動している間だけプログラムの進捗を意図的に停止させている。
// 角度指令 #include <stdio.h> #define _DYNAMICLOAD #include "dx2lib.h" // 角度指令と時間待ち関数 bool foo (TDeviceID dvid, const uint8_t *ids, const double *angles, int num, double sec) { bool result = DXL_SetGoalAnglesAndTime (dvid, ids, angles, num, sec); Sleep (sec * 1000); // 移動完了まで待機 return result; } int main (void) { if (LoadDLL ()) { uint8_t IDs[253]; // 見つかったIDのリスト int detectnum = 0; // 見つかった数 double angles[253]; // 角度の配列 TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000); if (dev) { detectnum = DXL_ScanDevices (dev, IDs); DXL_PrintDevicesList ((void *)&printf); // 全軸を 3: Position Control に設定 DXL_SetOperatingModesEquival (dev, IDs, detectnum, 3); // 全軸のトルクをONに設定 DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, true); // 1秒かけて全軸の角度を45度へ移動 printf ("45deg "); for (int j = 0; j < detectnum; j++) angles[j] = 45.0; foo (dev, IDs, angles, detectnum, 1.0); // 2秒かけて全軸の角度を-45度へ移動 printf ("-45deg "); for (int j = 0; j < detectnum; j++) angles[j] = -45.0; foo (dev, IDs, angles, detectnum, 2.0); // 1秒かけて全軸の角度を0度へ移動 printf ("0deg "); for (int j = 0; j < detectnum; j++) angles[j] = 0.0; foo (dev, IDs, angles, detectnum, 1.0); // 全軸のトルクをOFFに設定 DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, false); DX2_ClosePort (dev); } UnloadDLL (); } }
角度や時間を変えて実際の挙動がどのようになるか試して欲しい。
こちらもDYNAMIXELの本分であるフィードバック値の取得を行う。
全軸のDYNAMIXELのトルクをOFFにして外力でホーンを動かせる状態にした後、DXL_GetPresentAngles APIを使用して複数軸の現在角度を取得し、ID番号と合わせて取得した角度を全軸分表示する。角度の取得と表示を何かしらのキー入力を行うまでループし続ける。
// 角度取得 #include <stdio.h> #include <conio.h> #define _DYNAMICLOAD #include "dx2lib.h" int main (void) { if (LoadDLL ()) { uint8_t IDs[253]; // 見つかったIDのリスト int detectnum = 0; // 見つかった数 double angles[253]; // 角度の配列 TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000); if (dev) { detectnum = DXL_ScanDevices (dev, IDs); DXL_PrintDevicesList ((void *)&printf); // 全軸を 3: Position Control に設定 DXL_SetOperatingModesEquival (dev, IDs, detectnum, 3); DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, false); // キー入力があるまでループ while (!_kbhit ()) { // 全軸の現在角度を取得 DXL_GetPresentAngles (dev,IDs, angles, detectnum); // 取得した角度を表示 printf ("\r"); for (int j = 0; j < detectnum; j++) printf("[%d]%6.1f ", IDs[j], angles[j]); } DX2_ClosePort (dev); } UnloadDLL (); } }
現在のDYNAMIXELのホーンの位置を取得して他のDYNAMIXELへの角度指令値として利用する。
ID番号が1のDYNAMIXELのトルクをOFFにして外力でホーンを動かせる状態にし、DXL_GetPresentAngle APIで現在のホーンの角度を取得する。その後DXL_SetGoalAngles APIを用いてその角度を全軸のDYNAMIXELへ指令する。ID番号1に対しても角度を指令しているが、予めトルクをOFFにしているのでホーンは動かない。角度の取得と位置の指令を何かしらのキー入力を行うまでループし続ける。
// 角度取得と角度指令 #include <stdio.h> #include <conio.h> #define _DYNAMICLOAD #include "dx2lib.h" int main (void) { if (LoadDLL ()) { uint8_t IDs[253]; // 見つかったIDのリスト int detectnum = 0; // 見つかった数 double angle = 0; // 現在の角度 double angles[253]; // 角度の配列 TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000); if (dev) { detectnum = DXL_ScanDevices (dev, IDs); DXL_PrintDevicesList ((void *)&printf); // 全軸を 3: Position Control に設定 DXL_SetOperatingModesEquival (dev, IDs, detectnum, 3); DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, true); // ID=1のみトルクをOFFに設定 DXL_SetTorqueEnable (dev, 1, false); // キー入力があるまでループ while (!_kbhit ()) { // ID=1の現在角度を取得 DXL_GetPresentAngle (dev, 1, &angle); printf("\rPresent Angle = %4.1f ", angle); // 全軸に同じ目標角度を指令 for (int j = 0; j < detectnum; j++) angles[j] = angle; DXL_SetGoalAngles (dev, IDs, angles, detectnum); } DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, false); DX2_ClosePort (dev); } UnloadDLL (); } }
なお前述のDXL_SetGoalAnglesAndTime APIを使ったプログラムを実行すると、移動時間の情報がDYNAMIXELに残ったままになる。その直後に本プログラムを実行した場合、ID番号1のホーンの動きに他のホーンが即応しないように見える。その場合はDYNAMIXELへ供給していた電源を一旦OFFにした後に再度ONにしてからプログラムを実行する事。
最高速度はあまり速くはないが、DYNAMIXELにホイールを装着して台車の足回りに使う事ができる。
DXL_SetOperatingModesEquival APIを使用して全DYNAMIXELの動作モードを1:Velocity Controlに設定(このAPIは内部でトルクOFFを発行する)。全軸のトルクをON。DXL_SetGoalVelocities APIを使用して全軸の角速度を30deg/sに設定→全軸の角速度を-30deg/sに設定→全軸の角速度を0deg/sに設定。最後に全軸のトルクをOFFにして終了する。
// 角速度指令 #include <stdio.h> #include <conio.h> #define _DYNAMICLOAD #include "dx2lib.h" int main (void) { if (LoadDLL ()) { uint8_t IDs[253]; // 見つかったIDのリスト int detectnum = 0; // 見つかった数 double velo[253]; // 角速度の配列 TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000); if (dev) { detectnum = DXL_ScanDevices (dev, IDs); DXL_PrintDevicesList ((void *)&printf); // 全軸を 1: Velocity Control に設定 DXL_SetOperatingModesEquival (dev, IDs, detectnum, 1); DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, true); // 全軸の角速度を30deg/sに printf ("30deg/s "); for (int j = 0; j < detectnum; j++) velo[j] = 30.0; DXL_SetGoalVelocities (dev, IDs, velo, detectnum); Sleep (5000); // 5秒待ち // 全軸の角速度を-30deg/sに printf ("-30deg/s "); for (int j = 0; j < detectnum; j++) velo[j] = -30.0; DXL_SetGoalVelocities (dev, IDs, velo, detectnum); Sleep (5000); // 5秒待ち // 全軸の角速度を0deg/sに printf ("0deg/s "); for (int j = 0; j < detectnum; j++) velo[j] = 0.0; DXL_SetGoalVelocities (dev, IDs, velo, detectnum); Sleep (1000); // 1秒待ち DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, false); DX2_ClosePort (dev); } UnloadDLL (); } }
角度指令の応用で、各軸に与える角度指令値を時間に応じて複数パターンで変遷させる。
ここでは各DYNAMIXELの目標角度とその角度へ移行させる時間をTMotion構造体とし、複数のTMotion構造体を連続して実行させる事で一連の動きを実現している。またID番号1~8があるものとしてDXL_GetModelInfo APIを使用して検索時間を無くした。
GetQueryPerformanceCounter APIはPCの起動からの時間をmsで取得できるので、モーションで指定された時間を待機する際に利用している。
// モーション再生 #include <stdio.h> #include <conio.h> #define _DYNAMICLOAD #include "dx2lib.h" #define AXISNUM (8) // 軸数 #define MSize(d) (sizeof(d) / sizeof(d[0])) // 存在の有無にかかわらず指定軸数分のID番号のテーブル const uint8_t IDs[AXISNUM] = { 1, 2, 3, 4, 5, 6, 7, 8 }; // 各軸の角度と遷移時間の構造体 typedef struct { double angles[AXISNUM]; // 軸数分の角度情報 double sec; // 遷移時間 } TMotion; // モーションデータ1 // 8軸を1秒かけて0度(センター)に移動 const TMotion motion1[] = { {{ 0, 0, 0, 0, 0, 0, 0, 0}, 1.0}, }; // モーションデータ2 // 8軸の5つの様々な型を各秒数かけて移動 const TMotion motion2[] = { {{ 90, 390, 90, 90, 90, 90, 90, 90}, 3.0}, {{ 0, 0, 0, 0, 0, 0, 0, 0}, 2.0}, {{-290,-190, -90, -90, -90, -90, -90, -90}, 3.0}, {{ 45, 145, 45, 45, 45, 45, 45, 45}, 3.0}, {{ -30,-230, -30, -30, -30, -30, -30, -30}, 3.0}, }; // モーション再生 void PLAY (TDeviceID dev, const uint8_t *ids, const TMotion *motion, int framenum) { for (int i = 0; i < framenum; i++) { double t = GetQueryPerformanceCounter () + motion[i].sec * 1000; DXL_SetGoalAnglesAndTime (dev, ids, motion[i].angles, AXISNUM, motion[i].sec); while (t > GetQueryPerformanceCounter ()) Sleep (1); } } int main (void) { if (LoadDLL ()) { TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000); if (dev != 0) { for (int i = 0; i < AXISNUM; i++) printf ("id=%2d, ModelName=%s\n", IDs[i], DXL_GetModelInfo (dev, IDs[i])->name); // 全軸を 4: Expand Position Control に設定 DXL_SetOperatingModesEquival (dev, IDs, AXISNUM, 4); DXL_SetTorqueEnablesEquival (dev, IDs, AXISNUM, true); // motion1を再生 printf("motion1 "); PLAY (dev, IDs, motion1, MSize (motion1)); // motion2を再生 printf("motion2 "); PLAY (dev, IDs, motion2, MSize (motion2)); DXL_SetTorqueEnablesEquival (dev, IDs, AXISNUM, false); DX2_ClosePort (dev); } UnloadDLL (); } }
1つのパケット処理にかかる時間が短ければ単位時間当たりにこなせるパケット数が増えるため、自ずと通信速度が高い環境が望まれる。その恩恵に与かれるものとしてOSの並列処理が挙げられる。
ここでは角度と電流値を取得してモニタし続ける処理をthread1、角度を指令し続ける処理をthread2といった具合に個別のスレッドに割り当てた。下準備と終了処理はmain内で行っているが、それ以外は各々のスレッドが各々独立して処理している。
各々のスレッドに使用しているAPIが互いを意識しないコードになっているのは、DX2LIBのAPIが排他的に処理されるように作られているためである。しかし1つのスレッドでAPIを無限ループで遅滞なく呼び出し続けられてしまうとリソースを独占されてしまうため、ループ中にSleepを入れて適宜コンテキストスイッチを促している。
// マルチスレッド #include <stdio.h> #include <conio.h> #define _DYNAMICLOAD #include "dx2lib.h" #include <pthread.h> #include <math.h> bool term = false; // スレッド終了フラグ // スレッドに渡すパラメータ typedef struct { TDeviceID dev; // デバイスID const uint8_t *ids; // IDの配列 int num; // ID数 } TThInfo; // スレッド1 (モニタ) void *thread1 (TThInfo *info) { double pangle[info->num], pcur[info->num]; while (!term) { // 現在位置取得 if (DXL_GetPresentAngles (info->dev, info->ids, pangle, info->num)) { // 現在電流取得 if (DXL_GetPresentCurrents(info->dev, info->ids, pcur, info->num)) { printf("\r"); for (int i = 0; i < info->num; i++) printf ("(%d:%5.1f %6.1f) ", info->ids[i], pangle[i], pcur[i]); } } Sleep (5); } return (void *)NULL; } // スレッド2 (正弦波で角度指令) void *thread2 (TThInfo *info) { double ang, angle[info->num]; while (!term) { // 時間から±60度の角度を生成 (4秒周期) ang = sin (0.5 * M_PI * GetQueryPerformanceCounter () * 0.001) * 60.0; for (int i = 0; i < info->num; i++) angle[i] = ang; DXL_SetGoalAngles (info->dev, info->ids, angle, info->num); Sleep (5); } return (void *)NULL; } int main (void) { pthread_t th1, th2; if (LoadDLL ()) { uint8_t IDs[253]; int detectnum = 0; TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000); if (dev) { detectnum = DXL_ScanDevices (dev, IDs); DXL_PrintDevicesList ((void *)&printf); // 全軸を 3: Position Control に設定 DXL_SetOperatingModesEquival (dev, IDs, detectnum, 3); DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, true); // 2つのスレッドを作成 TThInfo info = { dev, IDs, detectnum }; pthread_create(&th1, NULL, (void *)thread1, (void *)&info); pthread_create(&th2, NULL, (void *)thread2, (void *)&info); // キー入力があるまでループ while (!_kbhit ()) { Sleep (100); } // スレッド終了を励起 term = true; // スレッド終了待ち pthread_join(th1, NULL); pthread_join(th2, NULL); DXL_SetTorqueEnablesEquival (dev, IDs, detectnum, false); DX2_ClosePort (dev); } UnloadDLL (); } }
同時に他のアプリケーションを使用すると、大抵の場合DYNAMIXELの挙動がおかしくなる。実際にはプログラムの実行が他のアプリケーションによって阻害され、
単純なループで高速に角度指令を行っている処理(こういった角度指令の仕方は旧来のDYNAMIXELの常套手段だった)が間延びしてしまう事が原因だ。マルチタスク環境において何がどのように影響するかを微調整する術はあるので、時間があるときにでも調べて欲しい。
またあえて諸々の通信速度を9600bpsに変更した場合も、最終的な動作にどの程度影響するかを試しても面白い。
その結果、Windows上でいわゆるリアルタイム性を担保するのは一筋縄ではいかない事が分かってもらえれば良いかと。
DX2LIBのページに紹介されている通りだが、その中でも流行のPythonで遊んでみても面白いかと思う。
GCC Developer LiteにはPythonが含まれており、コンパイルオプションで「Python(32bit)」や「Python(64bit)」を選ぶ事でライブラリに同梱されるPythonのサンプルソースコードが利用できる。しかし追加APIを利用したものばかりで、コントロールテーブルを読み書きする基本的なAPIは使われていない。
ここでは基本的なAPIのみを使った例を紹介するが、dx2lib.pyはC言語で作成されたライブラリをPythonで利用するためだけの記述がなされているため、Pythonならではの柔軟なコードがctypesによって変に厳格化されて使い勝手が悪い。
ひとまずDynamixel Xシリーズを前提とし、主要なアイテムのアドレスとコントロールテーブルを宣言してみた。
from ctypes import * ADDRESS_X_MODEL_NUMBER = 0 #W ADDRESS_X_MODEL_INFORMATION = 4 #L ADDRESS_X_VERSION_FW = 6 #B ADDRESS_X_ID = 7 #B ADDRESS_X_BAUDRATE = 8 #B ADDRESS_X_RETURN_DELAY_TIME = 9 #B ADDRESS_X_DRIVE_MODE = 10 #B ADDRESS_X_OPERATIING_MODE = 11 #B ADDRESS_X_HOMING_OFFSET = 20 #L ADDRESS_X_MAX_POSITION_LIMIT = 48 #L ADDRESS_X_MIN_POSITION_LIMIT = 52 #L ADDRESS_X_TORQUE_ENABLE = 64 #B ADDRESS_X_LED_RED = 65 #B ADDRESS_X_STATUS_RETURN_LEVEL = 69 #B ADDRESS_X_HARDWARE_ERROR_STATUS = 70 #B ADDRESS_X_VELOCITY_IGAIN = 76 #W ADDRESS_X_VELOCITY_PGAIN = 78 #W ADDRESS_X_POSITION_DGAIN = 80 #W ADDRESS_X_POSITION_IGAIN = 82 #W ADDRESS_X_POSITION_PGAIN = 84 #W ADDRESS_X_GOAL_PWM = 100 #W ADDRESS_X_GOAL_CURRENT = 102 #W ADDRESS_X_GOAL_VELOCITY = 104 #L ADDRESS_X_PROF_ACCELERATION = 108 #L ADDRESS_X_PROF_VELOCITY = 112 #L ADDRESS_X_GOAL_POSITION = 116 #L ADDRESS_X_MOVING = 122 #B ADDRESS_X_MOVING_STATUS = 123 #B ADDRESS_X_PRESENT_PWM = 124 #W ADDRESS_X_PRESENT_CURRENT = 126 #W ADDRESS_X_PRESENT_VELOCITY = 128 #L ADDRESS_X_PRESENT_POSITION = 132 #L ADDRESS_X_VELOCITY_TRAJECTORY = 136 #L ADDRESS_X_POSITION_TRAJECTORY = 140 #L ADDRESS_X_PRESENT_VOLTAGE = 144 #W ADDRESS_X_PRESENT_TEMP = 146 #B class XCtrlTable(Structure): _pack_ = 1 _fields_ = [ ('ModelNumber', c_uint16), ('ModelInformation', c_uint32), ('VersionofFirmware', c_uint8), ('ID', c_uint8), ('Baudrate', c_uint8), ('ReturnDelayTime', c_uint8), ('DriveMode', c_uint8), ('OperatingMode', c_uint8), ('SecondaryID', c_uint8), ('ProtocolVersion', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('HomingOffset', c_int32), ('MovingThreshold', c_uint32), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('TemperatureLimit', c_uint8), ('MaxVoltageLimit', c_uint16), ('MinVoltageLimit', c_uint16), ('PWMLimit', c_uint16), ('CurrentLimit', c_uint16), ('AccelerationLimit', c_uint32), ('VelocityLimit', c_uint32), ('MaxPositionLimit', c_uint32), ('MinPositionLimit', c_uint32), ('ExternalPortMode1', c_uint8), ('ExternalPortMode2', c_uint8), ('ExternalPortMode3', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('Shutdown', c_uint8), ('TorqueEnable', c_uint8), ('LED', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('StatusReturnLevel', c_uint8), ('RegisteredInstruction', c_uint8), ('HardwareErrorStatus', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('VelocityIGain', c_uint16), ('VelocityPGain', c_uint16), ('PositionDGain', c_uint16), ('PositionIGain', c_uint16), ('PositionPGain', c_uint16), ('reserve', c_uint8), ('reserve', c_uint8), ('FeedforwardAccelerationGain', c_uint16), ('FeedforwardVelocityGain, ', c_uint16), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('BusWatchdog', c_int8), ('reserve', c_uint8), ('GoalPWM', c_int16), ('GoalCurrent', c_int16), ('GoalVelocity', c_int32), ('ProfileAcceleration', c_uint32), ('ProfileVelocity', c_uint32), ('GoalPosition', c_int32), ('RealtimeTick', c_uint16), ('Moving', c_uint8), ('MovingStatus', c_uint8), ('PresentPWM', c_int16), ('PresentCurrent', c_int16), ('PresentVelocity', c_int32), ('PresentPosition', c_int32), ('VelocityTrajectory', c_uint32), ('PositionTrajectory', c_uint32), ('PresentInputVoltage', c_uint16), ('PresentTemperature', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('reserve', c_uint8), ('ExternalPortData1', c_uint16), ('ExternalPortData2', c_uint16), ('ExternalPortData3', c_uint16)] class VelocityGain(Structure): _pack_ = 1 _fields_ = [ ('VelocityIGain', c_uint16), ('VelocityPGain', c_uint16)] class PositionGain(Structure): _pack_ = 1 _fields_ = [ ('PositionDGain', c_uint16), ('PositionIGain', c_uint16), ('PositionPGain', c_uint16)]
例えばPresent PositionをID=1のDynamixelのコントロールテーブルから読み出すには、以下のコードで対応できる。
from dx2lib import * dev = DX2_OpenPort (b'\\\\.\\COM3', 57600) if dev != None: ppos = c_int32() err = c_uint16() DX2_ReadLongData (dev, 1, ADDRESS_X_PRESENT_POSITION, cast(byref(ppos), POINTER(c_uint32)), err) DX2_ClosePort (dev)
更にXCtrlTable構造体を用いて変数を定義し、その変数へID=1のDynamixelのコントロールテーブルから一括読み出した値を保存する場合はDX2_ReadBlockDataを用いる。
from dx2lib import * dev = DX2_OpenPort (b'\\\\.\\COM3', 57600) if dev != None: tbl = XCtrlTable() err = c_uint16() if DX2_ReadBlockData (dev, 1, ADDRESS_X_MODEL_NUMBER, cast(byref(tbl), POINTER(c_uint8)), sizeof(tbl), err): for field in tbl._fields_: if field[0] != 'reserve': print (tbl.__class__.__dict__[field[0]].offset, field[0], '=', getattr(tbl, field[0])) DX2_ClosePort (dev)
位置決め制御時のPIDゲインを設定する等の連続したアドレスに割り当てられた複数のアイテムを同時に書き換える場合はDX2_WriteBlockDataを用いる。
from dx2lib import * dev = DX2_OpenPort (b'\\\\.\\COM3', 57600) if dev != None: gain = PositionGain(0, 0, 400) DX2_WriteBlockData (dev, 1, ADDRESS_X_POSITION_DGAIN, cast(byref(gain), POINTER(c_uint8)), sizeof (gain), None) DX2_ClosePort (dev)
追加APIに無い機能は基本APIを使ってコントロールテーブルへ直接アクセスするしかないため、参考にしてもらえればと思う。
全てPythonで書いてしまえば、態々Cで作られたライブラリをリンクしたり、ctypesの有象無象に呵まれる事も無くスッキリする。という事でインストラクションパケットの生成と送信、ステータスパケットの受信、インストラクションの種類によってパラメータ部分を結合・分解するクラスを作ってみた。前記の内容とは一線を画すので、互換性は一切考えていない。
pySerialは必須。
なお単体で試せるようにデモコードを付けておいた。ターゲットはXM/XH/XDシリーズ、IDは1~5(少なくともID=1は必須)、ボーレートは3Mbpsを想定、デバッグ用にpsutilが必要、マルチスレッドでアクセスといったところだ。プロトコルV1に対応したDynamixelの場合は末尾のコメントになっているコードを利用してもらえればと。
一応関数に与えられたパラメータを確認しているが、全領域で検証したものではない。また値の正負については勝手に判断している部分もあるので、都合が悪ければ適宜修正してもらえればと思う。
#!/usr/bin/python3 # -*- coding: utf-8 -*- # # pyDXL.py # SPDX-License-Identifier: MIT # SPDX-FileCopyrightText: (C) 2024 mukyokyo import serial, threading, multiprocessing, array, struct from typing import Union from collections import namedtuple from struct import pack, unpack, iter_unpack ########################################################## # Functionalized the part of converting int to bytes. # If specified as a tuple, it is converted to bytes at once. ########################################################## def B2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d) else: return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d)) def W2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]]) else: return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d)) def L2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]]) else: return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d)) ########################################################## # API for Dynamixel protocol V1 ########################################################## class DXLProtocolV1: BROADCASTING_ID = 0xfe INST_PING = 0x01 INST_READ = 0x02 INST_WRITE = 0x03 INST_REG_WRITE = 0x04 INST_ACTION = 0x05 INST_FACTORY_RESET = 0x06 INST_REBOOT = 0x08 INST_SYNC_WRITE = 0x83 INST_SYNG_REG_WRITE = 0x85 TSyncW = namedtuple("TSyncW", ("id", ("data"))) def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None): """ Initalize parameters ------------- port : str Device name baudrate : int Serial baudrate[bps] timeout : float Read timeout[s] """ if isinstance(port, serial.Serial): self.__serial = port self.__baudrate = port.baudrate self.__timeout = port.timeout else: self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout) self.__baudrate = self.__serial.baudrate self.__timeout = self.__serial.timeout if lock == None: self.__lock = threading.Lock() else: self.__lock = lock self.__Error = 0 @property def lock(self): return self.__lock @property def baudrate(self): return self.__serial.baudrate @baudrate.setter def baudrate(self, baudrate): self.__baudrate = baudrate self.__serial.baudrate = baudrate @property def timeout(self): return self.__serial.timeout @timeout.setter def timeout(self, timeout): self.__timeout = timeout self.__serial.timeout = timeout def __reconfig(self): self.__serial.baudrate = self.__baudrate self.__serial.timeout = self.__timeout @property def Error(self): return self.__Error def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool): """ Sending packets parameters ------------- id : int Target ID inst : int Instruction command param : bytes Packet parameters Returns ------- bytes Packets sent bool Success or failure """ self.__reconfig() if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 253)) and len(param) <= (256 - 6): instp = bytearray([0xff,0xff,id,0,inst]) + bytes(param) instp[3] = len(instp) - 3 instp += B2Bs(~sum(instp[2:]) & 0xff) self.__serial.reset_input_buffer() if echo: print('TX:', instp.hex(':')) self.__serial.write(instp) return bytes(instp), True return None, False def __rx(self, length) -> bytes: s = self.__serial.read(length) l = len(s) if l == length: return s else: r = s length -= l if length > 0: while self.__serial.in_waiting > 0: s = self.__serial.read(length) r += s length -= len(s) if length == 0: break return r def RxPacket(self, echo = False) -> (bytes, bool): """ Receiving packets Returns ------- bytes Packets received bool Success or failure """ statp = self.__rx(5) if statp: if len(statp) == 5: if statp[0] == 0xff and statp[1] == 0xff: l = statp[3] - 1 self.__Error = statp[4] statp += self.__rx(l) if len(statp) == l + 5: if statp[-1:][0] == ((~sum(statp[2:-1])) & 0xff): if echo: print('RX:', statp.hex(':')) return bytes(statp), (statp[4] & 0x40) == 0 return None, False def Write(self, id : int, addr : int, data : bytes, echo = False) -> bool: """ Write instruction parameters ------------- id : int Target ID addr : int Target item address data : bytes Data to be written Returns ------- result : bool Success or failure """ with self.__lock: if id >= 0 and id <= self.BROADCASTING_ID and addr >= 0 and addr <= 254: if self.TxPacket(id, self.INST_WRITE, B2Bs(addr) + data, echo)[1]: if id != self.BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: return dat[2] == id and (dat[4] & 0x18) == 0 else: return True return False def Write8(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: return self.Write(id, addr, B2Bs(data), echo) def Write16(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: return self.Write(id, addr, W2Bs(data), echo) def Write32(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: return self.Write(id, addr, L2Bs(data), echo) def Read(self, id : int, addr : int, length : int, echo = False) -> bytes: """ Read instruction parameters ------------- id : int Target ID addr : int Target item address length : int Number of bytes to read Returns ------- bytes Data read """ with self.__lock: if id >= 0 and id <= 253 and addr >= 0 and addr <= 254 and length > 0 and length <= (256 - 6): if self.TxPacket(id, self.INST_READ, B2Bs((addr, length)), echo)[1]: dat, r = self.RxPacket(echo) if r: if dat[2] == id and (dat[4] & 0x8) == 0: return bytes(dat[5:-1]) return None def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.Read(id, addr, length, echo) if r != None: n = sum(iter_unpack('b' if signed else 'B', r), ()) return n if length > 1 else n[0] return None def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.Read(id, addr, 2 << (length - 1), echo) if r != None: n = sum(iter_unpack('h' if signed else 'H', r), ()) return n if length > 1 else n[0] return None def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.Read(id, addr, 4 << (length - 1), echo) if r != None: n = sum(iter_unpack('i' if signed else 'I', r), ()) return n if length > 1 else n[0] return None def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool: """ Sync Write instruction parameters ------------- addr : int Target item address length : int Number of bytes to write id_datas : (TSyncW) Target ID and data Returns ------- bool Success or failure """ with self.__lock: if addr >= 0 and addr <= 254 and length > 0 and length < (256 - 6): param = B2Bs((addr,length)) for d in id_datas: param += B2Bs(d.id) + d.data if len(d.data) != length or d.id < 0 or d.id > 253: del param return False return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1] return False def Ping(self, id : int, echo = False) -> bool: with self.__lock: if self.TxPacket(id, self.INST_PING, bytes(), echo)[1]: dat, r = self.RxPacket(echo) if r: return id == dat[2] and dat[3] == 2 return False def FactoryReset(self, id : int, echo = False) -> bool: with self.__lock: if self.TxPacket(id, self.INST_FACTORY_RESET, bytes(), echo)[1]: dat, r = self.RxPacket(echo) if r: return id == dat[2] and dat[3] == 2 return False def Reboot(self, id : int, echo = False) -> bool: with self.__lock: if self.TxPacket(id, self.INST_REBOOT, bytes(), echo)[1]: dat, r = self.RxPacket(echo) if r: return id == dat[2] and dat[3] == 2 return False ########################################################## # API for Dynamixel protocol V2 ########################################################## class DXLProtocolV2: BROADCASTING_ID = 0xfe INST_PING = 0x01 INST_READ = 0x02 INST_WRITE = 0x03 INST_REG_WRITE = 0x04 INST_ACTION = 0x05 INST_FACTORY_RESET = 0x06 INST_REBOOT = 0x08 INST_SYS_WRITE = 0x0d INST_CLEAR = 0x10 INST_CONTROL_TABLE_BACKUP = 0x20 INST_STATUS = 0x55 INST_SYNC_READ = 0x82 INST_SYNC_WRITE = 0x83 INST_SYNG_REG_WRITE = 0x85 INST_FAST_SYNC_READ = 0x8a INST_BULK_READ = 0x92 INST_BULK_WRITE = 0x93 INST_FAST_BULK_READ = 0x9a TSyncW = namedtuple("TSyncW", ("id", ("data"))) TBulkW = namedtuple("TBulkW", ("id", "addr", ("data"))) TBulkR = namedtuple("TBulkR", ("id", "addr", "length")) __crc16_lutable = array.array('H') def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None): """ Initalize parameters ------------- port : str Device name baudrate : int Serial baudrate[bps] timeout : float Read timeout[s] """ if isinstance(port, serial.Serial): self.__serial = port self.__baudrate = port.baudrate self.__timeout = port.timeout else: self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout) self.__baudrate = self.__serial.baudrate self.__timeout = self.__serial.timeout if lock == None: self.__lock = threading.Lock() else: self.__lock = lock self.__Error = 0 poly = 0x8005 for i in range(256): nData = i << 8 nAccum = 0 for j in range(8): nAccum = ((nAccum << 1) ^ poly if (nData ^ nAccum) & 0x8000 else nAccum << 1) & 0xffff nData <<= 1 self.__crc16_lutable.append(nAccum) @property def lock(self): return self.__lock @property def baudrate(self): return self.__serial.baudrate @baudrate.setter def baudrate(self, baudrate): self.__baudrate = baudrate self.__serial.baudrate = baudrate @property def timeout(self): return self.__serial.timeout @timeout.setter def timeout(self, timeout): self.__timeout = timeout self.__serial.timeout = timeout def __reconfig(self): self.__serial.baudrate = self.__baudrate self.__serial.timeout = self.__timeout @property def Error(self): return self.__Error def __crc16(self, data : bytes) -> int: crc = 0 for d in data: crc = (crc << 8) ^ self.__crc16_lutable[(((crc >> 8) ^ d) & 0xff)] return crc & 0xffff def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool): """ Sending packets parameters ------------- id : int Target ID inst : int Instruction command param : bytes Packet parameters Returns ------- bytes Packets sent bool Success or failure """ self.__reconfig() if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 252)) and len(param) < (65536 - 10): instp = bytearray([0xff,0xff,0xfd,0x00,id,0,0,inst]) + bytearray(param).replace(b'\xff\xff\xfd',b'\xff\xff\xfd\xfd') instp[5:7] = W2Bs(len(instp) - 5) instp += W2Bs(self.__crc16(instp)) self.__serial.reset_input_buffer() if echo: print('TX:', instp.hex(':')) self.__serial.write(instp) return bytes(instp), True return None, False def __rx(self, length) -> bytes: s = self.__serial.read(length) l = len(s) if l == length: return s else: r = s length -= l if length > 0: while self.__serial.in_waiting > 0: s = self.__serial.read(length) r += s length -= len(s) if length == 0: break return r def RxPacket(self, echo = False) -> (bytes, bool): """ Receiving packets Returns ------- bytes Packets received bool Success or failure """ self.__serial.flush() statp = self.__rx(9) if statp: if len(statp) == 9: if statp[0] == 0xff and statp[1] == 0xff and statp[2] == 0xfd and statp[3] == 0 and statp[7] == 0x55: l = unpack('<H', statp[5:7])[0] - 2 self.__Error = statp[8] statp += self.__rx(l) if len(statp) == l + 9: if unpack('<H', statp[-2:])[0] == self.__crc16(statp[:-2]): statp = statp[0:9] + statp[9:].replace(b'\xff\xff\xfd\xfd',b'\xff\xff\xfd') if echo: print('RX:', statp.hex(':')) return bytes(statp), (statp[8] & 0x7f) == 0 return None, False def Write(self, id : int, addr : int, data : bytes, echo = False) -> bool: """ Write instruction parameters ------------- id : int Target ID addr : int Target item address data : bytes Data to be written Returns ------- result : bool Success or failure """ with self.__lock: if ((id >= 0 and id <= 252) or (id == self.BROADCASTING_ID)) and addr >= 0 and addr <= 65535: if self.TxPacket(id, self.INST_WRITE, W2Bs(addr) + data, echo)[1]: if id != self.BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: return dat[4] == id and (dat[8] & 0x7f) == 0 else: return True return False def Write8(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: return self.Write(id, addr, B2Bs(data), echo) def Write16(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: return self.Write(id, addr, W2Bs(data), echo) def Write32(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: return self.Write(id, addr, L2Bs(data), echo) def Read(self, id : int, addr : int, length : int, echo = False) -> bytes: """ Read instruction parameters ------------- id : int Target ID addr : int Target item address length : int Number of bytes to read Returns ------- bytes Data read bool Success or failure """ with self.__lock: if id >= 0 and id <= 252 and addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10): if self.TxPacket(id, self.INST_READ, W2Bs(addr) + W2Bs(length), echo)[1]: dat, r = self.RxPacket(echo) if r: return bytes(dat[9:-2]) return None def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.Read(id, addr, length, echo) if r != None: n = sum(iter_unpack('b' if signed else 'B', r), ()) return n if length > 1 else n[0] return None def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.Read(id, addr, 2 << (length - 1), echo) if r != None: n = sum(iter_unpack('h' if signed else 'H', r), ()) return n if length > 1 else n[0] return None def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.Read(id, addr, 4 << (length - 1), echo) if r != None: n = sum(iter_unpack('i' if signed else 'I', r), ()) return n if length > 1 else n[0] return None def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool: """ Sync Write instruction parameters ------------- addr : int Target item address length : int Number of bytes to write id_datas : (TSyncW) Target ID and data Returns ------- bool Success or failure """ with self.__lock: if addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10): param = W2Bs(addr) + W2Bs(length) for d in id_datas: param += bytes((d.id,)) + d.data if len(d.data) != length or d.id < 0 or d.id > 252: del param return False return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1] return False def SyncRead(self, addr : int, length : int, ids : (int), echo = False) -> tuple: """ Sync Read instruction parameters ------------- addr : int Target item address length : int Number of bytes to read ids : (int) Target IDs Returns ------- tuple Read ID and data """ with self.__lock: result = () if addr >= 0 and addr < 65535 and length > 0 and length < (65536 - 10): if self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_READ, W2Bs(addr) + W2Bs(length) + B2Bs(ids), echo)[1]: for id in ids: dat, r = self.RxPacket(echo) if r: if dat[4] == id: result += (id, bytes(dat[9:9 + length])), else: result += (id, bytes([])), else: result += (id, bytes([])), return result def BulkWrite(self, data : (TBulkW), echo = False) -> bool: """ Bulk Write instruction parameters ------------- data : (TBulkW) Target ID and address, data ids : (TSyncW) Target ID and data Returns ------- bool Success or failure """ with self.__lock: param = bytes() for d in data: if d.id >= 0 and d.id <= 252 and d.addr >= 0 and d.addr <= 65535: param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(len(d.data)) + d.data else: del param return False return self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_WRITE, param, echo)[1] def BulkRead(self, data:(TBulkR), echo = False) -> tuple: """ Bulk Read instruction parameters ------------- data : (TBulkR) ID, address, and number of bytes to be read Returns ------- tuple Read ID and data """ with self.__lock: result = () param = bytes() for d in data: if d.id < 0 or d.addr < 0 or d.length < 0: return result param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(d.length) if self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_READ, param, echo)[1]: for d in data: rxd, r = self.RxPacket(echo) if r: if(d.id == rxd[4]): result += (d.id, bytes(rxd[9:-2])), else: result += (d.id, bytes([])), else: result += (d.id, bytes([])), del param return result def Ping(self, id : int, echo = False) -> bool: with self.__lock: if self.TxPacket(id, self.INST_PING, bytes(), echo)[1]: rxd, r = self.RxPacket(echo) if r: return id == rxd[4] and rxd[5] == 7 and rxd[6] == 0 return False def FactoryReset(self, id : int, p1 : int, echo = False) -> bool: with self.__lock: if self.TxPacket(id, self.INST_FACTORY_RESET, bytes((p1,)), echo)[1]: rxd, r = self.RxPacket(echo) if r: return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0 return False def Reboot(self, id : int, echo = False) -> bool: with self.__lock: if self.TxPacket(id, self.INST_REBOOT, bytes(), echo)[1]: rxd, r = self.RxPacket(echo) if r: return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0 return False def Clear(self, id : int, val, echo = False) -> bool: with self.__lock: if self.TxPacket(id, self.INST_CLEAR, B2Bs(1) + L2Bs(val), echo)[1]: rxd, r = self.RxPacket(echo) if r: return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0 return False ########################################################## # test code ########################################################## if __name__ == "__main__": from contextlib import contextmanager from threading import Thread import time, gc, psutil, os ec = False ID = 1 fin = False @contextmanager def stopwatch(): start_time = time.time() yield #print('..proc time={:.1f}ms {}'.format((time.time() - start_time) * 1000, psutil.Process(os.getpid()).memory_info().rss)) def func1(dx): global ID, fin, ec try: ''' """ reset dxl """ with stopwatch(): r = dx.FactoryReset(ID, 0xff, echo = ec) print(f'FactoryReset({ID}): {r}') time.sleep(0.5) ''' """ ping dxl """ print(f'Ping({ID})=', dx.Ping(ID, echo = ec)) """ reboot dxl """ with stopwatch(): r = dx.Reboot(ID, echo = ec) print(f'Reboot({ID})={r}') time.sleep(0.5) """ basic packet proc """ with stopwatch(): with dx.mutex: r0 = dx.TxPacket(ID, dx.INST_WRITE, (65, 0, 1)) r1 = dx.RxPacket() print(f'TxPacket({ID})={r0[1]}', r0[0].hex(':')) if r1[0]: print(f'RxPacket({ID})={r1[1]}', r1[0].hex(':')) """ dump memory """ l = 50 for addr in range(0, 700, l): r = dx.Read(ID, addr, l, echo = ec) print(f'Read({addr};{l})=',r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!') """ read byte item """ with stopwatch(): r = dx.Read8(ID, 65, echo = ec) print(f'Read8({ID})={r}') """ sync read inst. """ print('SyncRead=') with stopwatch(): d = dx.SyncRead(0, 4, (1, 2, 3, 4, 5), echo = ec) for d in d: print(f' ({d[0]}) 0,4 hex=', d[1].hex(':') if len(d[1]) > 0 else ()) with stopwatch(): d = dx.SyncRead(132, 4, (1, 2, 3, 4, 5), echo = ec) for d in d: print(f' ({d[0]}) 132,4 int32=', unpack('<i', pack('<I', unpack('<I', d[1])[0]))[0] if len(d[1]) > 0 else ()) """ sync write inst. """ for i in range(30): with stopwatch(): dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(1)), dx.TSyncW(2, B2Bs(1)), dx.TSyncW(3, B2Bs(1)), dx.TSyncW(4, B2Bs(1)), dx.TSyncW(5, B2Bs(1))), echo = ec) time.sleep(0.05) with stopwatch(): dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(0)), dx.TSyncW(2, B2Bs(0)), dx.TSyncW(3, B2Bs(0)), dx.TSyncW(4, B2Bs(0)), dx.TSyncW(5, B2Bs(0))), echo = ec) time.sleep(0.05) """ set goal position """ #torque off if dx.Write8(ID, 64, 0, echo = ec): #multi turn if dx.Write8(ID, 11, 4, echo = ec): #torque on if dx.Write8(ID, 64, 1, echo = ec): print(f'Write32/Read32({ID})') for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)): for i in range(10): #set goal position with stopwatch(): dx.Write32(ID, 116, gp, echo = ec) #get present position with stopwatch(): pp = dx.Read32(ID, 132, signed = True, echo = ec) if pp != None: print(f' w={gp:6} r={pp:6} diff={gp-pp:5}', end='\r') else: pass print('None ', end='\r') break time.sleep(0.01) else: continue pass break print('') #torque off with stopwatch(): dx.Write8(ID, 64, 0, echo = ec) #normal turn with stopwatch(): dx.Write8(ID, 11, 3, echo = ec) """ block read/write (add/remove suffixes) """ with stopwatch(): r = dx.Read(ID, 120, 27, echo = ec) if r: print(f'Read({ID};128)=', tuple(iter_unpack('<HBBhhiiIIHB', r))[0]) fffffd = bytes(( 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, )) with stopwatch(): r = dx.Write(ID, 634, fffffd, echo = ec) print(f'Write({ID})={r}') with stopwatch(): r = dx.Read(ID, 634, len(fffffd), echo = ec) if r: print(f'Read({ID})=', r.hex(':')) """ bulk read inst. """ with stopwatch(): d = dx.BulkRead((dx.TBulkR(1,0,10), dx.TBulkR(2,0,20), dx.TBulkR(3,0,30), dx.TBulkR(4,0,40), dx.TBulkR(5,0,40)), echo = ec) print('BulkRead=') for d in d: print(f' ({d[0]},0)', d[1].hex(':')) dx.Write8(ID, 64, 1, echo = ec) """ bulk write inst. """ print('BulkWrite=') with stopwatch(): print(' 1', dx.BulkWrite((dx.TBulkW(1,104, L2Bs((0,0,0,1024))), dx.TBulkW(2,65,B2Bs(0)), dx.TBulkW(3,65,B2Bs(0)), dx.TBulkW(4,65,B2Bs(0))), echo = ec)) time.sleep(0.5) with stopwatch(): print(' 2', dx.BulkWrite((dx.TBulkW(1,104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(2048)), dx.TBulkW(2,65,B2Bs(1)), dx.TBulkW(3,65,B2Bs(1)), dx.TBulkW(4,65,B2Bs(1))), echo = ec)) time.sleep(0.5) with stopwatch(): print(' 3', dx.BulkWrite((dx.TBulkW(1,104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(1024)), dx.TBulkW(2,65,B2Bs(0)), dx.TBulkW(3,65,B2Bs(0))), echo = ec)) time.sleep(0.5) with stopwatch(): print(' 4', dx.BulkWrite((dx.TBulkW(1,116, L2Bs(2048)), dx.TBulkW(2,65,B2Bs(1)), dx.TBulkW(3,65,B2Bs(1))), echo = ec)) time.sleep(0.5) with stopwatch(): print(' 5', dx.BulkWrite((dx.TBulkW(1,65,bytes(0)),dx.TBulkW(2,65,bytes(0)),dx.TBulkW(3,65,bytes(0))), echo = ec)) dx.Write8(ID, 64, 0, echo = ec) except Exception as ex: pass import traceback print('--- Caught Exception ---') traceback.print_exc() print('------------------------') ''' trace = [] tb = ex.__traceback__ while tb is not None: trace.append({ "filename": tb.tb_frame.f_code.co_filename, "name": tb.tb_frame.f_code.co_name, "lineno": tb.tb_lineno }) tb = tb.tb_next print(str({ 'type': type(ex).__name__, 'message': str(ex), 'trace': trace })) ''' fin = True def func2(dx): global ID, fin, ec while not fin: try: l = dx.Read8(ID, 65, echo = ec) if l != None: l ^= 1 dx.Write8(ID, 65, l, echo = ec) time.sleep(0.05) except Exception as ex: import traceback print('--- Caught Exception ---') traceback.print_exc() print('------------------------') return def func3(dx): global ID, fin, ec try: """ dump memory """ l = 20 for addr in range(0, 250, l): r = dx.Read(ID, addr, l, echo = ec) print(f'Read({addr};{l})=',r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!') ''' """ reset dxl """ with stopwatch(): r = dx.FactoryReset(ID, echo = ec) print(f'FactoryReset({ID}): {r}') time.sleep(0.5) ''' """ ping dxl """ print(f'Ping({ID})=', dx.Ping(ID, echo = ec)) """ reboot dxl """ with stopwatch(): r = dx.Reboot(ID, echo = ec) print(f'Reboot({ID})={r}') time.sleep(1) """ basic packet proc """ with stopwatch(): with dx.mutex: r0 = dx.TxPacket(ID, dx.INST_WRITE, (25, 1)) r1 = dx.RxPacket() print(f'TxPacket({ID})={r0[1]}', r0[0].hex(':')) if r1[0]: print(f'RxPacket({ID})={r1[1]}', r1[0].hex(':')) """ read byte item """ with stopwatch(): r = dx.Read8(ID, 25, echo = ec) print(f'Read8({ID})={r}') """ sync write inst. """ for i in range(30): with stopwatch(): dx.SyncWrite(25, 1, (dx.TSyncW(1, B2Bs(1)), dx.TSyncW(2, B2Bs(1)), dx.TSyncW(3, B2Bs(1))), echo = ec) time.sleep(0.05) with stopwatch(): dx.SyncWrite(25, 1, (dx.TSyncW(1, B2Bs(0)), dx.TSyncW(2, B2Bs(0)), dx.TSyncW(3, B2Bs(0))), echo = ec) time.sleep(0.05) """ set goal position """ #torque off if dx.Write8(ID, 24, 0, echo = ec): #multi turn if dx.Write16(ID, 6, (4095, 4095), echo = ec): #torque on if dx.Write8(ID, 24, 1, echo = ec): print(f'Write16/Read16({ID})') for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)): for i in range(10): #set goal position with stopwatch(): dx.Write16(ID, 30, gp, echo = ec) #get present position with stopwatch(): pp = dx.Read16(ID, 36, signed = True, echo = ec) if pp != None: print(f' w={gp:6} r={pp:6} diff={gp-pp:5}', end='\r') else: pass print('None ', end='\r') break time.sleep(0.005) else: continue pass break print('') #torque off dx.Write8(ID, 24, 0, echo = ec) #normal turn with stopwatch(): dx.Write16(ID, 6, (0, 4095), echo = ec) except Exception as ex: pass import traceback print('--- Caught Exception ---') traceback.print_exc() print('------------------------') import sys import platform print(sys.version) try: dx = DXLProtocolV2('\\\\.\\COM12', 3000000) except: pass else: th1 = Thread(target = func1, args = (dx,)) th2 = Thread(target = func2, args = (dx,)) th1.start() th2.start() th1.join() th2.join() del th1, th2, dx try: dx = DXLProtocolV1('\\\\.\\COM12', 57600) except: dx = None else: th3 = Thread(target = func3, args = (dx,)) th3.start() th3.join() del th3, dx print('fin')
Python2ついでにMicroPythonの互換性レベルを図ってみた。
欲しいライブラリが用意されていない以上に、基本的な部分が結構緩い。また実際に動かすとそれなりに遅い。それでもMicroPythonは存在しているし、個人的にはかつてのBASICの感覚が蘇ってきたので、ちょっとした使い方であれば選択肢に入れても良いかなと。
ひとまずPicoSHIELD+Raspberry Pi Pico上で動くように修正してみた。オーバーロックしているのはご愛敬として、マルチスレッドを除外した程度で実質PC版と大差無い。@micropython.viperを多用した時はかなり高速化できていたのだが、ここはコードの互換性を優先。
# pyDXL.py for MicroPython(RP2040) # SPDX-License-Identifier: MIT # SPDX-FileCopyrightText: (C) 2024 mukyokyo from machine import UART, Pin import time, array from collections import namedtuple from struct import pack, unpack ########################################################## # Functionalized the part of converting int to bytes. # If specified as a tuple, it is converted to bytes at once. ########################################################## @micropython.native def B2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d) else: return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d)) @micropython.native def W2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]]) else: return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d)) @micropython.native def L2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]]) else: return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d)) ########################################################## # API for Dynamixel protocol V1 ########################################################## class DXLProtocolV1: BROADCASTING_ID = 0xfe INST_PING = 0x01 INST_READ = 0x02 INST_WRITE = 0x03 INST_REG_WRITE = 0x04 INST_ACTION = 0x05 INST_FACTORY_RESET = 0x06 INST_REBOOT = 0x08 INST_SYNC_WRITE = 0x83 INST_SYNG_REG_WRITE = 0x85 TSyncW = namedtuple("TSyncW", ("id", ("data"))) def __init__(self, baudrate = 57600, timeout_ms = 50): """ Initalize parameters ------------- port : int UART no. baudrate : int Serial baudrate[bps] timeout : float Read timeout[s] """ self.__Error = 0 self.__baudrate = baudrate self.__timeout = timeout_ms self.__serial = UART(1, baudrate, tx = Pin(4), rx = Pin(5), txbuf = 500, rxbuf = 500, timeout = timeout_ms) @property def baudrate(self): return self.__baudrate @baudrate.setter def baudrate(self, baudrate): self.__baudrate = baudrate self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) @property def timeout(self): return self.__timeout @timeout.setter def timeout(self, timeout_ms): self.__timeout = timeout_ms self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) @property def Error(self): return self.__Error @micropython.native def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool): """ Sending packets parameters ------------- id : int Target ID inst : int Instruction command param : bytes Packet parameters Returns ------- bytes Packets sent bool Success or failure """ if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 253)) and len(param) <= (256 - 6): instp = bytearray([0xff,0xff,id,0,inst]) + bytes(param) instp[3] = len(instp) - 3 instp += B2Bs(~sum(instp[2:]) & 0xff) _ = self.__serial.read(self.__serial.any()) if echo: print('TX:', instp.hex(':')) self.__serial.write(instp) return bytes(instp), True return None, False @micropython.native def __rx(self, length) -> bytes: s = self.__serial.read(length) if s: l = len(s) if l == length: return s else: r = s length -= l while self.__serial.any() > 0: s = self.__serial.read(length) r += s length -= len(s) if length == 0: break return r else: return bytes() @micropython.native def RxPacket(self, echo = False) -> (bytes, bool): """ Receiving packets Returns ------- bytes Packets received bool Success or failure """ self.__serial.flush() statp = self.__rx(5) if statp: if len(statp) == 5: l = statp[3] - 1 self.__Error = statp[4] statp += self.__rx(l) if len(statp) == l + 5: if statp[-1:][0] == ((~sum(statp[2:-1])) & 0xff): if echo: print('RX:', statp.hex(':')) return bytes(statp), (statp[4] & 0x40) == 0 return None, False @micropython.native def Write(self, id : int, addr : int, data : bytes, echo = False) -> bool: """ Write instruction parameters ------------- id : int Target ID addr : int Target item address data : bytes Data to be written Returns ------- result : bool Success or failure """ if id >= 0 and id <= self.BROADCASTING_ID and addr >= 0 and addr <= 254: if self.TxPacket(id, self.INST_WRITE, B2Bs(addr) + data, echo)[1]: if id != self.BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: return dat[2] == id and (dat[4] & 0x18) == 0 else: return True return False @micropython.native def Write8(self, id : int, addr : int, data : int | tuple | list, echo = False) -> bool: return self.Write(id, addr, B2Bs(data), echo) @micropython.native def Write16(self, id : int, addr : int, data : int | tuple | list, echo = False) -> bool: return self.Write(id, addr, W2Bs(data), echo) @micropython.native def Write32(self, id : int, addr : int, data : int | tuple | list, echo = False) -> bool: return self.Write(id, addr, L2Bs(data), echo) @micropython.native def Read(self, id : int, addr : int, length : int, echo = False) -> (bytes, bool): """ Read instruction parameters ------------- id : int Target ID addr : int Target item address length : int Number of bytes to read Returns ------- bytes Data read bool Success or failure """ if id >= 0 and id <= 253 and addr >= 0 and addr <= 254 and length > 0 and length <= (256 - 6): if self.TxPacket(id, self.INST_READ, B2Bs((addr, length)), echo)[1]: dat, r = self.RxPacket(echo) if r: if dat[2] == id and (dat[4] & 0x8) == 0: return bytes(dat[5:-1]) return None @micropython.native def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.Read(id, addr, length, echo) if r != None: n = unpack('<' + (('b'*length) if signed else ('B'*length)), r[0:1 * length]) return n if length > 1 else n[0] return None @micropython.native def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.Read(id, addr, 2 * length, echo) if r != None: n = unpack('<' + (('h'*length) if signed else ('H'*length)), r[0:2 * length]) return n if length > 1 else n[0] return None @micropython.native def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.Read(id, addr, 4 * length, echo) if r != None: n = unpack('<' + (('i'*length) if signed else ('I'*length)), r[0:4 * length]) return n if length > 1 else n[0] return None @micropython.native def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool: """ Sync Write instruction parameters ------------- addr : int Target item address length : int Number of bytes to write id_datas : (TSyncW) Target ID and data Returns ------- bool Success or failure """ if addr >= 0 and addr <= 254 and length > 0 and length < (256 - 6): param = B2Bs((addr,length)) for d in id_datas: param += B2Bs(d.id) + d.data if len(d.data) != length or d.id < 0 or d.id > 253: del param return False del param return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1] return False @micropython.native def Ping(self, id : int, echo = False) -> bool: if self.TxPacket(id, self.INST_PING, bytes(), echo): dat, r = self.RxPacket(echo) if r: return id == dat[2] and dat[3] == 2 return False @micropython.native def FactoryReset(self, id : int, echo = False) -> bool: if self.TxPacket(id, self.INST_RESET, bytes(), echo): dat, r = self.RxPacket(echo) if r: return id == dat[2] and dat[3] == 2 return False @micropython.native def Reboot(self, id : int, echo = False) -> bool: if self.TxPacket(id, self.INST_REBOOT, bytes(), echo): dat, r = self.RxPacket(echo) if r: return id == dat[2] and dat[3] == 2 return False ########################################################## # API for Dynamixel protocol V2 ########################################################## class DXLProtocolV2: BROADCASTING_ID = const(0xfe) INST_PING = const(0x01) INST_READ = const(0x02) INST_WRITE = const(0x03) INST_REG_WRITE = const(0x04) INST_ACTION = const(0x05) INST_FACTORY_RESET = const(0x06) INST_REBOOT = const(0x08) INST_SYS_WRITE = const(0x0d) INST_CLEAR = const(0x10) INST_CONTROL_TABLE_BACKUP = const(0x20) INST_STATUS = const(0x55) INST_SYNC_READ = const(0x82) INST_SYNC_WRITE = const(0x83) INST_SYNG_REG_WRITE = const(0x85) INST_FAST_SYNC_READ = const(0x8a) INST_BULK_READ = const(0x92) INST_BULK_WRITE = const(0x93) INST_FAST_BULK_READ = const(0x9a) TSyncW = namedtuple("TSyncW", ("id", ("data"))) TBulkW = namedtuple("TBulkW", ("id", "addr", ("data"))) TBulkR = namedtuple("TBulkR", ("id", "addr", "length")) __crc16_lutable = array.array('H', ( 0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011, 0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D, 0x8027, 0x0022, 0x8063, 0x0066, 0x006C, 0x8069, 0x0078, 0x807D, 0x8077, 0x0072, 0x0050, 0x8055, 0x805F, 0x005A, 0x804B, 0x004E, 0x0044, 0x8041, 0x80C3, 0x00C6, 0x00CC, 0x80C9, 0x00D8, 0x80DD, 0x80D7, 0x00D2, 0x00F0, 0x80F5, 0x80FF, 0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1, 0x00A0, 0x80A5, 0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1, 0x8093, 0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087, 0x0082, 0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D, 0x8197, 0x0192, 0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB, 0x01AE, 0x01A4, 0x81A1, 0x01E0, 0x81E5, 0x81EF, 0x01EA, 0x81FB, 0x01FE, 0x01F4, 0x81F1, 0x81D3, 0x01D6, 0x01DC, 0x81D9, 0x01C8, 0x81CD, 0x81C7, 0x01C2, 0x0140, 0x8145, 0x814F, 0x014A, 0x815B, 0x015E, 0x0154, 0x8151, 0x8173, 0x0176, 0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162, 0x8123, 0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137, 0x0132, 0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E, 0x0104, 0x8101, 0x8303, 0x0306, 0x030C, 0x8309, 0x0318, 0x831D, 0x8317, 0x0312, 0x0330, 0x8335, 0x833F, 0x033A, 0x832B, 0x032E, 0x0324, 0x8321, 0x0360, 0x8365, 0x836F, 0x036A, 0x837B, 0x037E, 0x0374, 0x8371, 0x8353, 0x0356, 0x035C, 0x8359, 0x0348, 0x834D, 0x8347, 0x0342, 0x03C0, 0x83C5, 0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1, 0x83F3, 0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7, 0x03E2, 0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD, 0x83B7, 0x03B2, 0x0390, 0x8395, 0x839F, 0x039A, 0x838B, 0x038E, 0x0384, 0x8381, 0x0280, 0x8285, 0x828F, 0x028A, 0x829B, 0x029E, 0x0294, 0x8291, 0x82B3, 0x02B6, 0x02BC, 0x82B9, 0x02A8, 0x82AD, 0x82A7, 0x02A2, 0x82E3, 0x02E6, 0x02EC, 0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2, 0x02D0, 0x82D5, 0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1, 0x8243, 0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257, 0x0252, 0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E, 0x0264, 0x8261, 0x0220, 0x8225, 0x822F, 0x022A, 0x823B, 0x023E, 0x0234, 0x8231, 0x8213, 0x0216, 0x021C, 0x8219, 0x0208, 0x820D, 0x8207, 0x0202 )) def __init__(self, baudrate = 57600, timeout_ms = 50): """ Initalize parameters ------------- port : int UART no. baudrate : int Serial baudrate[bps] timeout : float Read timeout[s] """ self.__Error = 0 self.__baudrate = baudrate self.__timeout = timeout_ms self.__serial = UART(1, baudrate, tx = Pin(4), rx = Pin(5), txbuf = 500, rxbuf = 500, timeout = timeout_ms) @property def baudrate(self): return self.__baudrate @baudrate.setter def baudrate(self, baudrate): self.__baudrate = baudrate self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) @property def timeout(self): return self.__timeout @timeout.setter def timeout(self, timeout_ms): self.__timeout = timeout_ms self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) @property def Error(self): return self.__Error @micropython.viper def __crc16(self, dat : ptr8, l:int) -> int: crc:int = 0 pt = ptr16(self.__crc16_lutable) for i in range(l): crc = (crc << 8) ^ pt[(((crc >> 8) ^ dat[i]) & 0xff)] return crc & 0xffff @micropython.native def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool): """ Sending packets parameters ------------- id : int Target ID inst : int Instruction command param : bytes Packet parameters Returns ------- bytes Packets sent bool Success or failure """ if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 252)) and len(param) < (65536 - 10): instp = bytearray([0xff,0xff,0xfd,0x00,id,0,0,inst]) + bytearray(param).replace(b'\xff\xff\xfd',b'\xff\xff\xfd\xfd') instp[5:7] = W2Bs(len(instp) - 5) instp += W2Bs(self.__crc16(instp, len(instp))) _ = self.__serial.read(self.__serial.any()) if echo: print('TX:', instp.hex(':')) self.__serial.write(instp) return bytes(instp), True return None, False @micropython.native def __rx(self, length) -> bytes: s = self.__serial.read(length) if s: l = len(s) if l == length: return s else: r = s length -= l while self.__serial.any() > 0: s = self.__serial.read(length) r += s length -= len(s) if length == 0: break return r else: return bytes() @micropython.native def RxPacket(self, echo = False) -> (bytes, bool): """ Receiving packets Returns ------- bytes Packets received bool Success or failure """ self.__serial.flush() statp = self.__rx(9) if statp: if len(statp) == 9: if statp[0] == 0xff and statp[1] == 0xff and statp[2] == 0xfd and statp[3] == 0 and statp[7] == 0x55: l = unpack('<H', statp[5:7])[0] - 2 self.__Error = statp[8] statp += self.__rx(l) if len(statp) == l + 9: if unpack('<H', statp[-2:])[0] == self.__crc16(statp[:-2], len(statp[:-2])): statp = statp[0:9] + statp[9:].replace(b'\xff\xff\xfd\xfd',b'\xff\xff\xfd') if echo: print('RX:', statp.hex(':')) return bytes(statp), (statp[8] & 0x7f) == 0 return None, False @micropython.native def Write(self, id : int, addr : int, data: bytes, echo = False) -> bool: """ Write instruction parameters ------------- id : int Target ID addr : int Target item address data : bytes Data to be written Returns ------- result : bool Success or failure """ if ((id >= 0 and id <= 252) or (id == self.BROADCASTING_ID)) and addr >= 0 and addr <= 65535: if self.TxPacket(id, self.INST_WRITE, W2Bs(addr) + data, echo)[1]: if id != self.BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: return dat[4] == id and (dat[8] & 0x7f) == 0 else: return True return False @micropython.native def Write8(self, id : int, addr : int, data : int | tuple | list, echo = False): return self.Write(id, addr, B2Bs(data), echo) @micropython.native def Write16(self, id : int, addr : int, data : int | tuple | list, echo = False): return self.Write(id, addr, W2Bs(data), echo) @micropython.native def Write32(self, id : int, addr : int, data : int | tuple | list, echo = False): return self.Write(id, addr, L2Bs(data), echo) @micropython.native def Read(self, id : int, addr : int, length : int, echo = False) -> (bytes, bool): """ Read instruction parameters ------------- id : int Target ID addr : int Target item address length : int Number of bytes to read Returns ------- bytes Data read bool Success or failure """ if id >= 0 and id <= 252 and addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10): if self.TxPacket(id, self.INST_READ, W2Bs(addr) + W2Bs(length), echo)[1]: dat, r = self.RxPacket(echo) if r: return bytes(dat[9:-2]) return None @micropython.native def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.Read(id, addr, length, echo) if r: n = unpack('<' + (('b'*length) if signed else ('B'*length)), r[0:1 * length]) return n if length > 1 else n[0] return None @micropython.native def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.Read(id, addr, 2 * length, echo) if r != None: n = unpack('<' + (('h'*length) if signed else ('H'*length)), r[0:2 * length]) return n if length > 1 else n[0] return None @micropython.native def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.Read(id, addr, 4 * length, echo) if r != None: n = unpack('<' + (('i'*length) if signed else ('I'*length)), r[0:4 * length]) return n if length > 1 else n[0] return None @micropython.native def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool: """ Sync Write instruction parameters ------------- addr : int Target item address length : int Number of bytes to write id_datas : (TSyncW) Target ID and data Returns ------- bool Success or failure """ if addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10): param = W2Bs(addr) + W2Bs(length) for d in id_datas: param += bytes((d.id,)) + d.data if len(d.data) != length or d.id < 0 or d.id > 252: del param return False return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1] return False @micropython.native def SyncRead(self, addr : int, length : int, ids : (int), echo = False) -> tuple: """ Sync Read instruction parameters ------------- addr : int Target item address length : int Number of bytes to read ids : (int) Target IDs Returns ------- tuple Read ID and data """ result = () if addr >= 0 and addr < 65535 and length > 0 and length < (65536 - 10): if self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_READ, W2Bs(addr) + W2Bs(length) + B2Bs(ids), echo)[1]: for id in ids: dat, r = self.RxPacket(echo) if r: if dat[4] == id: result += (id, bytes(dat[9:9 + length])), else: result += (id, bytes([])), else: result += (id, bytes([])), return result @micropython.native def BulkWrite(self, data : (TBulkW), echo = False) -> bool: """ Bulk Write instruction parameters ------------- addr : int Target item address length : int Number of bytes to write ids : (TSyncW) Target ID and data Returns ------- bool Success or failure """ param = bytes() for d in data: if d.id >= 0 and d.id <= 252 and d.addr >= 0 and d.addr <= 65535: param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(len(d.data)) + d.data else: del param return False return self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_WRITE, param, echo)[1] @micropython.native def BulkRead(self, data:(TBulkR), echo = False) -> tuple: """ Bulk Read instruction parameters ------------- data : (TBulkR) ID, address, and number of bytes to be read Returns ------- tuple Read ID and data """ result = () param = bytes() for d in data: if d.id < 0 or d.addr < 0 or d.length < 0: return result param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(d.length) if self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_READ, param, echo)[1]: for d in data: rxd, r = self.RxPacket(echo) if r: if(d.id == rxd[4]): result += (d.id, bytes(rxd[9:-2])), else: result += (d.id, bytes([])), else: result += (d.id, bytes([])), del param return result @micropython.native def Ping(self, id : int, echo = False) -> bool: if self.TxPacket(id, self.INST_PING, bytes(), echo)[1]: rxd, r = self.RxPacket(echo) if r: return id == rxd[4] and rxd[5] == 7 and rxd[6] == 0 return False @micropython.native def FactoryReset(self, id : int, p1 : int, echo = False) -> bool: if self.TxPacket(id, self.INST_FACTORY_RESET, B2Bs(p1), echo)[1]: rxd, r = self.RxPacket(echo) if r: return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0 return False @micropython.native def Reboot(self, id : int, echo = False) -> bool: if self.TxPacket(id, self.INST_REBOOT, bytes(), echo)[1]: rxd, r = self.RxPacket(echo) if r: return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0 return False @micropython.native def Clear(self, id : int, val, echo = False) -> bool: if self.TxPacket(id, self.INST_CLEAR, B2Bs(1) + L2Bs(val), echo)[1]: rxd, r = self.RxPacket(echo) if r: return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0 return False ########################################################## # test code ########################################################## if __name__ == "__main__": import uasyncio as asyncio from ucontextlib import contextmanager import gc try: led = Pin('LED', Pin.OUT) except TypeError: led = Pin(25, Pin.OUT) led.off() dx = None ec = False ID = 1 fin = False async def test1(): global dx, ec, ID, fin @contextmanager def stopwatch(): led.toggle() start_time = time.ticks_us() yield #print('..proc time={:.2f}ms {}'.format((time.ticks_us() - start_time) / 1000, gc.mem_alloc())) # Basic operation check try: ''' with stopwatch(): r = dx.FactoryReset(ID, echo=ec) print(f'Reset({ID}): {r}') dx.baudrate = 57600 ID = 1 await asyncio.sleep(0.5) ''' """ ping dxl """ for i in range(10): with stopwatch(): print(f'Ping({i}):', dx.Ping(i, echo=ec)) """ reboot dxl """ with stopwatch(): r = dx.Reboot(ID, echo=ec) print(f'Reboot({ID})= {r}') await asyncio.sleep(0.5) """ basic packet proc """ with stopwatch(): r0 = dx.TxPacket(ID, dx.INST_WRITE, (65, 0, 1)) r1 = dx.RxPacket() print('TxPacket({})= {} {}'.format(ID, r0[1], r0[0].hex(':') if r0[0] else '--')) print('RxPacket({})= {} {}'.format(ID, r1[1], r1[0].hex(':') if r1[0] else '--')) """ dump memory """ l = 50 for addr in range(0, 700, l): r = dx.Read(ID, addr, l, echo = ec) print(f'Read({addr};{l})=',r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!') """ read byte item """ with stopwatch(): r = dx.Read8(ID, 65, echo=ec) print(f'Read8({ID},65)= {r}') """ read word item """ with stopwatch(): r = dx.Read16(ID, 120, echo=ec) print(f'Read16({ID},120)= {r}') """ read long item """ with stopwatch(): r = dx.Read32(ID, 132, echo=ec) print(f'Read32({ID},132)= {r}') """ sync read inst. """ with stopwatch(): d = dx.SyncRead(0, 4, (1, 2, 3, 4, 5, 6, 7, 8), echo=ec) print('SyncRead=') for d in d: print(f' ({d[0]},0)', d[1].hex(':') if len(d[1]) > 0 else ()) """ sync write inst. """ for i in range(20): with stopwatch(): dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(1)), dx.TSyncW(2, B2Bs(1)), dx.TSyncW(3, B2Bs(1))), echo=ec) await asyncio.sleep(0.05) with stopwatch(): dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(0)), dx.TSyncW(2, B2Bs(0)), dx.TSyncW(3, B2Bs(0))), echo=ec) await asyncio.sleep(0.05) """ set goal position """ #torque off with stopwatch(): dx.Write8(ID, 64, 0, echo=ec) #multi turn with stopwatch(): dx.Write8(ID, 11, 4, echo=ec) #torque on with stopwatch(): dx.Write8(ID, 64, 1, echo=ec) for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)): #set goal potition with stopwatch(): dx.Write32(ID, 116, gp, echo=ec) #get present potition with stopwatch(): pp = dx.Read32(ID, 132, signed = True, echo=ec) if pp: print(f'Goal Pos={gp:6d}, Present Pos={pp:6d}', end='\r') else: break await asyncio.sleep(0.05) else: print('') #torque off with stopwatch(): dx.Write8(ID, 64, 0, echo=ec) #normal turn with stopwatch(): dx.Write8(ID, 11, 3, echo=ec) """ test add/remove suffixes """ fffffd = bytes(( 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, )) with stopwatch(): r = dx.Write(ID, 634, fffffd, echo=ec) print(f'Write({ID},634)= {r}') with stopwatch(): r = dx.Read(ID, 634, len(fffffd), echo=ec) print(f'Read({ID},634)=', r.hex(':') if r else '--') """ bulk read inst. """ with stopwatch(): d = dx.BulkRead((dx.TBulkR(1, 0, 10), dx.TBulkR(2, 0, 20), dx.TBulkR(3, 0, 30), dx.TBulkR(4, 0, 40), dx.TBulkR(6, 0, 50)), echo=ec) print('BulkRead=') for d in d: print(f' ({d[0]},0)', d[1].hex(':')) """ bulk write inst. """ dx.Write8(ID, 64, 1, echo=ec) with stopwatch(): dx.BulkWrite((dx.TBulkW(1, 104, L2Bs((0,0,0,1024))), dx.TBulkW(2, 65, B2Bs(0)), dx.TBulkW(3, 65, B2Bs(0))), echo=ec) await asyncio.sleep(0.5) with stopwatch(): dx.BulkWrite((dx.TBulkW(1, 104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(2048)), dx.TBulkW(2, 65, B2Bs(1)), dx.TBulkW(3, 65, B2Bs(1))), echo=ec) await asyncio.sleep(0.5) with stopwatch(): dx.BulkWrite((dx.TBulkW(1, 104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(1024)), dx.TBulkW(2, 65, B2Bs(0)), dx.TBulkW(3, 65, B2Bs(0))), echo=ec) await asyncio.sleep(0.5) with stopwatch(): dx.BulkWrite((dx.TBulkW(1, 116, L2Bs(2048)), dx.TBulkW(2,65, B2Bs(1)), dx.TBulkW(3, 65, B2Bs(1))), echo=ec) await asyncio.sleep(0.5) with stopwatch(): dx.BulkWrite((dx.TBulkW(1, 65, B2Bs(0)), dx.TBulkW(2, 65, B2Bs(0)), dx.TBulkW(3, 65, B2Bs(0))), echo=ec) dx.Write8(ID, 64, 0, echo=ec) except Exception as ex: print('--- Caught Exception ---') import sys sys.print_exception(ex) print('------------------------') fin = True async def test2(): global dx, ec, ID, fin await asyncio.sleep(0.5) while not fin: try: dx.Write8(ID, 65, dx.Read8(ID, 65, echo=ec) ^ 1, echo=ec) await asyncio.sleep(0.02) except Exception as ex: print('--- Caught Exception ---') import sys sys.print_exception(ex) print('------------------------') def test3(): global dx, ec, ID, fin # Basic operation check print('reset=', dx.Reboot(1, echo=ec)) time.sleep(2) print('ping=', dx.Ping(1, echo=ec)) dx.TxPacket(1, dx.INST_READ, (0x2b, 0x01), True) dx.RxPacket(True) r = dx.Read(1, 0, 70, echo=ec) print(r.hex(',')) print('len=',len(r)) for i in range(15): led = dx.Read8(1, 25, echo=ec) ^ 1 dx.Write8(1, 25, led, echo=ec) print(f'led={led}', end = '\r') time.sleep(0.2) print() dx.Write8(1, 24, 0, echo=ec) dx.Write16(1, 6, (4095, 4095), echo=ec) for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)): dx.Write16(1, 30, gp, echo=ec) pp = dx.Read16(1, 36, signed=True, echo=ec) print(f'Goal Pos={gp:6d}, Present Pos={pp:6d}', end='\r') time.sleep(0.05) else: print('') dx.Write8(1, 24, 0, echo=ec) dx.Write16(1, 6, (0, 4095),echo=ec) machine.freq(250000000) # set the CPU frequency to 250 MHz dx = DXLProtocolV2(baudrate = 3000000) loop = asyncio.get_event_loop() loop.create_task(test1()) loop.create_task(test2()) loop.run_forever() time.sleep(1) dx = DXLProtocolV1(baudrate = 57600) test3()