1: 2021-02-08 (月) 18:56:29 takaboo ソース 現: 2024-01-09 (火) 23:24:03 takaboo ソース
Line 28: Line 28:
#ref(DXLSHARE/jst_B3B-EH.png) #ref(DXLSHARE/jst_B3B-EH.png)
| 端子番号 | 信号名 | |h | 端子番号 | 信号名 | |h
-|1 |GND |電源のマイナス側&br;シリアル通信のGNDと共有 | +|CENTER:60|CENTER:|LEFT:|c 
-|2 |VDD |電源のプラス側&br;大半のDYNAMIXELは12V | +|1|GND|電源のマイナス側&br;シリアル通信のGNDと共有 | 
-|3 |TTL Signal |シリアル通信の信号&br;双方向 |+|2|VDD|電源のプラス側&br;大半のDYNAMIXELは12V | 
 +|3|TTL Signal|シリアル通信の信号&br;双方向 |
-RS-485 -RS-485
| Pats Name | JST Parts Number |h | Pats Name | JST Parts Number |h
Line 38: Line 39:
#ref(DXLSHARE/jst_B4B-EH.png) #ref(DXLSHARE/jst_B4B-EH.png)
| 端子番号 | 信号名 | |h | 端子番号 | 信号名 | |h
-|1 |GND |電源のマイナス側&br;シリアル通信のGNDと共有 | +|CENTER:60|CENTER:|LEFT:|c 
-|2 |VDD |電源のプラス側&br;大半のDYNAMIXELはDC12V | +|1|GND|電源のマイナス側&br;シリアル通信のGNDと共有 | 
-|3 |RS-485 D+ |シリアル通信のプラス側差動信号&br;双方向 | +|2|VDD|電源のプラス側&br;大半のDYNAMIXELはDC12V | 
-|4 |RS-485 D- |シリアル通信のマイナス側差動信号&br;双方向 |+|3|RS-485 D+|シリアル通信のプラス側差動信号&br;双方向 | 
 +|4|RS-485 D-|シリアル通信のマイナス側差動信号&br;双方向 |
紹介したTTL及びRS-485の2種類のI/Fは一般的なPCには標準装備されていないため、USBポートを介して増設するための装置が用意されている。複数のI/Fに対応していたり、いずれか1つのI/Fにのみ対応していたり、電源の供給を行えたりとラインナップが複数あるため、以下に現行品のみの比較表を示す。 紹介したTTL及びRS-485の2種類のI/Fは一般的なPCには標準装備されていないため、USBポートを介して増設するための装置が用意されている。複数のI/Fに対応していたり、いずれか1つのI/Fにのみ対応していたり、電源の供給を行えたりとラインナップが複数あるため、以下に現行品のみの比較表を示す。
-|                    | [[USB2RS485 dongle>BTE079C]] | [[USB2TTL dongle>BTE080C]] | [[USB2DXIF dongle>BTE096]] | [[DXHUB>DXHUB2]]             | [[U2D2>https://emanual.robotis.com/docs/​en/parts/interface/u2d2/]] | +|                    | [[USB2RS485 dongle>BTE079C]] | [[USB2TTL dongle>BTE080C]] | [[USB2DXIF dongle>BTE096]] | [[USB2DXIF>BTE101]]    | [[Starter Kit A>BTH076]] | [[Starter Kit B>BTH077]] | [[PicoSHIELD]]                  | [[DXHUB>DXHUB2]]   | [[U2D2>https://emanual.robotis.com/docs/​en/parts/interface/u2d2/]] | 
-|~| #ref(http://www.besttechnology.co.jp/uploads/onli​neshop/photos/e00267e5f30aeb4b7bdf.png,1​5%) | #ref(http://www.besttechnology.co.jp/uploads/onli​neshop/photos/p00268p5f30af2505632.png,1​5%) | #ref(http://www.besttechnology.co.jp/uploads/onli​neshop/photos/n00269n5f30af0648685.png,1​5%) | #ref(http://www.besttechnology.co.jp/uploads/onli​neshop/photos/a00255a5dcac2c8eed34.png,1​5%) | #ref(http://www.besttechnology.co.jp/uploads/onli​neshop/middles/k00208k5a9cf31f42b33.png,​15%)| +|~| #ref(https://www.besttechnology.co.jp/uploads/onli​neshop/photos/e00267e5f30aeb4b7bdf.png,1​5%) | #ref(https://www.besttechnology.co.jp/uploads/onli​neshop/photos/p00268p5f30af2505632.png,1​5%) | #ref(https://www.besttechnology.co.jp/uploads/onli​neshop/photos/n00269n5f30af0648685.png,1​5%) | #ref(https://www.besttechnology.co.jp/uploads​/onlineshop/photos/k00291k61ff36e21ffa8.​png,15%) | #ref(https://www.besttechnology.co.jp/up​loads/onlineshop/photos/k00264k62d7c3f74​5615.png,15%) | #ref(https://www.besttechnology.co.jp/up​loads/onlineshop/photos/h00265h62d7fbc66​56a1.png,15%) | #ref(https://www.besttechnology.co.jp/up​loads/onlineshop/photos/i00297i640ec82d2​2466.png,15%) | #ref(https://www.besttechnology.co.jp/uploads/onli​neshop/photos/a00255a5dcac2c8eed34.png,1​5%) | #ref(https://www.besttechnology.co.jp/uploads/onli​neshop/middles/k00208k5a9cf31f42b33.png,​15%)| 
-|Manufacture        | BestTechnology   | <             | < | < | ROBOTIS          | +|Manufacture        | BestTechnology               | <                         | <                         | <                       | <                        | <                        | <                              | <                  | ROBOTIS          | 
-|USB             | USB2.0 FS&br;USB Type-A       | < | < | USB2.0 HS&br;USB Type-C             | USB2.0 HS&br;micro USB Type-B         +|USB                 | USB2.0 FS&br;USB Type-A     | <                         | <                         | USB2.0 FS&br;USB Type-C | USB2.0 FS&br;USB micro-B | USB2.0 FS&br;USB Type-C  | USB2.0 FS&br;micro USB Type-B | USB2.0 HS&br;USB Type-C | USB2.0 HS&br;micro USB Type-B | 
-|FTDI Chip          | [[FT234X>https://www.ftdichip.com/Produc​ts/ICs/FT234XD.html]]           | <             | <             | [[FT232H>https://www.ftdichip.com/Produc​ts/ICs/FT232H.htm]]           | <   +|FTDI Chip          | [[FT234X>https://www.ftdichip.com/Produc​ts/ICs/FT234XD.html]] | < | < | < | [[FT231XS>https://ftdichip.com/products/​ft231xs/]] | [[FT234X>https://www.ftdichip.com/Produc​ts/ICs/FT234XD.html]] | [[RP2040>https://www.raspberrypi.com/doc​umentation/microcontrollers/rp2040.html]​] | [[FT232H>https://www.ftdichip.com/Produc​ts/ICs/FT232H.htm]] | < | 
-|Max DTR Rate [Mbps] | 3               | <             | <             | 12               | 6   +|Max DTR Rate [Mbps] | 3                           | <                         | <                         | <                      | <                        | <                        | 3                              | 12                 | 6                
-|DYNAMIXEL I/F      | RS-485 x1       | TTL x1         | RS-485 x1, TTL x1 | RS-485 x6, TTL x6 | RS-485 x1, TTL x2 | +|DYNAMIXEL I/F      | RS-485 x1                   | TTL x1                     | RS-485 x1, TTL x1         | RS-485 x1, TTL x1      | RS-485 x5, TTL x5        | RS-485 x4, TTL x4        | RS-485 x1                      | RS-485 x6, TTL x6 | RS-485 x1, TTL x2 | 
-|Isolation          | Yes             | <             | <                 | <                 | No   +|Isolation          | Yes                         | <                         | <                         | No                      | <                       | <                        | <                              | Yes                | No              
-|Dimension [mm]      | 43.2x13.0x8.5   | <             | <             | 52x33x15         | 48x18x14.6 | +|Dimension [mm]      | 43.2x13.0x8.5               | <                         | <                         | 18.0x12.7x9            |                          | 40x40                    | 51x23.5                        | 52x33x15           | 48x18x14.6      
-|Compatible software | [[DYNAMIXEL Wizard 2.0>https://emanual.robotis.com/docs/en/​software/dynamixel/dynamixel_wizard2/]],​ [[R+2.0>https://emanual.robotis.com/docs​/en/software/rplus2/manager/]], [[DXLIB]], [[DX2LIB]], [[DYNAMIXEL SDK>https://emanual.robotis.com/docs/en/​software/dynamixel/dynamixel_sdk/overvie​w/]] | < | < | < |< | +|Other spec.        | Small                        | <                          | <                          | Tiny                    |                          |                          |                                | Power distribution | Plastic enclosure | 
-|Other spec.        | Small            | <             | <             | Power distribution | Plastic enclosure |+|Software compatibility | [[DYNAMIXEL Wizard 2.0>https://emanual.robotis.com/docs/en/​software/dynamixel/dynamixel_wizard2/]],​ [[R+2.0>https://emanual.robotis.com/docs​/en/software/rplus2/manager/]], [[DXLIB]], [[DX2LIB]], [[DYNAMIXEL SDK>https://emanual.robotis.com/docs/en/​software/dynamixel/dynamixel_sdk/overvie​w/]] | < | < | < |< |< |< |< |< |
ここで紹介していない古いI/F製品は概ね旧来のDYNAMIXELを前提としており、装備されているコネクタがmolex社製のものが主である。もちろんConvertible Cableを仲介させる事で現行モデルのDYNAMIXELにも接続できるので、持っているのであれば買い直す必要は無い。 ここで紹介していない古いI/F製品は概ね旧来のDYNAMIXELを前提としており、装備されているコネクタがmolex社製のものが主である。もちろんConvertible Cableを仲介させる事で現行モデルのDYNAMIXELにも接続できるので、持っているのであれば買い直す必要は無い。
Line 71: Line 73:
--XL330~ --XL330~
5V・TTL・JST社コネクタ・コアドモータ・電流センサ搭載。現行モデル。 5V・TTL・JST社コネクタ・コアドモータ・電流センサ搭載。現行モデル。
 +--XC330~
 +5V/12V・TTL・JST社コネクタ・コアレスモータ・電流センサ搭載。現行モデル。
--XL430~ --XL430~
12V・TTL・JST社コネクタ・コアドモータ・電流センサ非搭載で廉価モデルという位置付け。2軸を内蔵した2XLもある。現行モデル。 12V・TTL・JST社コネクタ・コアドモータ・電流センサ非搭載で廉価モデルという位置付け。2軸を内蔵した2XLもある。現行モデル。
Line 79: Line 83:
--XH430/XH540~ --XH430/XH540~
12/24V・TTL/RS-485・JST社コネクタ・maxon社コアレスモータ・電流センサ搭載で、トルクによって2種類の形状(430/540)がある。現行モデル。 12/24V・TTL/RS-485・JST社コネクタ・maxon社コアレスモータ・電流センサ搭載で、トルクによって2種類の形状(430/540)がある。現行モデル。
---XW+--XD430/XD540
-12V・RS-485・防水コネクタ・maxon社コアレスモータ・電流センサ搭載。形状は1種類(540)。IP68に対応。現行モデル。+12V・RS-485・JST社コネクタ・maxon社コアレスモータ・高耐久・電流センサ搭載で、トルクによって2種類の形状(430/540)がある。現行モデル。 
 +--XW430/XW540~ 
 +12V・RS-485・防水コネクタ・maxon社コアレスモータ・電流センサ搭載。IP68に対応。現行モデル。
-Pシリーズ~ -Pシリーズ~
24V・RS-485・JST社コネクタ・コアレス/ブラシレスモータ・電流センサ搭載・高分解能・低バックラッシュ・高トルク。3種類の形状があるのとかなり高価。唯一定格トルクを謳うハイエンド向け現行モデル。 24V・RS-485・JST社コネクタ・コアレス/ブラシレスモータ・電流センサ搭載・高分解能・低バックラッシュ・高トルク。3種類の形状があるのとかなり高価。唯一定格トルクを謳うハイエンド向け現行モデル。
Line 86: Line 92:
MXとXシリーズのストールトルクと無負荷最大回転数をモデルごとにプロットしたグラフを紹介しておく。 MXとXシリーズのストールトルクと無負荷最大回転数をモデルごとにプロットしたグラフを紹介しておく。
#ref(mx_x_torque_vs_speed_graph.png,50%)​ #ref(mx_x_torque_vs_speed_graph.png,50%)​
-アプリケーションに要求されるトルクで選定するのが一般的なのだが、DYNAMIXELの大半のモデルは定格トルクではなくストールトルクを謳っている。ストールトルクはこれ以上は出せないといった値(瞬時最大トルクに近い)であり、連続的に使って良いというトルクでは無い。+アプリケーションに要求されるトルクで選定するのが一般的なのだが、DYNAMIXELの大半のモデルは定格トルクではなくストールトルクを謳っている。ストールトルクは物理的にこの値を超えて出力されることは無い値(瞬時最大トルクに近い)であり、連続的に使って良いという意味では無い。
-なおここではあえてXシリーズ以降を前提とするので、AXやMXシリーズまたは廃盤品を使用する場合は差異を深掘りした上で適宜読み替えてもらうものとする。また廃盤品を含む全ラインナップの一覧は[[こちら>DXLSeries]]に掲載した。+なおここではあえてXシリーズ以降を前提とするので、AXやMXシリーズまたは廃盤品を使用する場合は差異を各自で深掘りした上で適宜読み替えてもらうものとする。また廃盤品を含む全ラインナップの一覧は[[こちら>DXLSeries]]に掲載した。
***各装置の接続 [#zb9535bd] ***各装置の接続 [#zb9535bd]
Line 317: Line 323:
***GCC Developer Lite[#z879d754] ***GCC Developer Lite[#z879d754]
#ref(GCC Developer Lite/GDL.png,50%) #ref(GCC Developer Lite/GDL.png,50%)
-[[GCC Developer Lite]]はマイコンボード製品向けにプログラムを手っ取り早く動かせるツールとして提供している(こちらのリンク先のインストーラは使用しない)が、ここではWindows上で動作するプログラムをコーディング及びコンパイルする事が目的となる。+[[GCC Developer Lite]]はマイコンボード製品向けにプログラムを手っ取り早く動かせるツールとして提供しているが、ここではWindows上で動作するプログラムをコーディング及びコンパイルする事が目的となる。
-最近の統合環境の類いに比べたら極めて少ない機能しか備えておらず正直なところ今更な感じではあるのだが、無償かつ簡単な設定でソースコードの編集とコンパイルはできるという理由だけで選定した。+最近の統合環境の類いに比べたら極めて少ない機能しか備えておらず正直なところ今更な感じではあるのだが、無償かつ簡単な設定でソースコードの編集とコンパイルができるという理由だけで選定した。
****インストーラのダウンロードとインストール [#gcd418f7] ****インストーラのダウンロードとインストール [#gcd418f7]
-Windows向けの実行プログラムを生成する事のみを目的としたインストーラを用意した。今後はWindows XP以前のOSにはインストールできないので悪しからず。 +ここではWindows向けの実行プログラムを生成する事が目的なので、[[ここ>GCC Developer Lite#DOWNLOAD]]から「基本パック」と「WIN64パック」をダウンロードする。手順を間違えるとインストールできないので、[[こちら>GCC Developer Lite#t1d1c731]]を読んでから作業する。
-+
-[[GDL4Win2.6.0.101.exe>https://www.besttechnology.co.jp/downloa​d/GDL4Win2.6.0.101.exe]]~ +
-ファイルサイズ: 229,579,147 byte~ +
-MD5ハッシュ値: 6af66ac7ac3d86b925969b27b1438b90 +
- +
-他のバージョンのGCC Developer Liteと共存させる事を想定していないため、インストール済みのGCC Developer Liteがあれば予めアンインストールしておく事。~ +
-インストーラーを実行するとPCの設定によってはスマートスクリーン・セキュリティー警告・ユーザアカウント制御等のメッセージがしつこく表示される。ダウンロードしたファイルのサイズやハッシュ値が上記と同一であれば「実行(%%%R%%%)」や「はい」を選択してインストールを進めて構わないが、気がかりな場合はインストールそのものを止めここで示す作業を諦めるしかない。~ +
-#ref(GDL_CAmess0.png,40%) +
-#ref(GDL_CAmess1.png,70%) +
-#ref(GDL_CAmess2.png,70%) +
-これらのメッセージが表示されないままインストールそのものを拒否されたり、インストーラーファイルが削除される事も考えられる。その場合はWindowsの署名に関するポリシーを変更したり、アンチウィルスの制限を緩める等の措置を講じる必要がある。~ +
-OSの設定において8.3形式のファイルの生成が有効になっていない場合もインストールを拒否される。その場合は[[管理者権限でコマンドプロンプトを起動>https://www.google.com/search?q=adminis​trator+command+prompt]]し、以下の様に[[fsutil>https://www.google.com/search?q=fsutil+b​ehavior+disable8dot3]]を使って8.3形式のファイル名の生成を有効化してからインストールを行う。 +
- C:\>fsutil behavior set disable8dot3 0 +
- 現在のレジストリの状態は 0 です (すべてのボリューム上で 8dot3 名の作成を有効にします)。 +
- +
- C:\> +
- +
-なお極めて少ない機能と書きつつもかなりの数のライブラリやPythonまで同梱しているので、ファイルの数とサイズは莫大になっている。そのためインストールが完了するまにではそれ相応の時間がかかる。+
****ソースコード編集・ファイル操作 [#m53faff0] ****ソースコード編集・ファイル操作 [#m53faff0]
Line 348: Line 336:
****コンパイルオプション [#l2754b74] ****コンパイルオプション [#l2754b74]
-64ビット版のWindows上で32ビット版のプログラムは実行できるが、その逆はできない。32ビット版と64ビット版のどちらを選んでも大きな違いは感じないと思うが、ここではどちらのOSでも実行できる32ビット版のプログラムを生成する事にする。+Windowsの64bit版を想定したのでWIN64パックをインストールする事としたが、32bitの場合はWIN32パックを選定するまでである。もし32bit版を適用した場合は64bitとある所を32bit、x64とある所をx86に読み替える事。
-コンパイルしたいソースコードを開いている状態で、「ツール(%%%T%%%)」メニュー内の「コンパイルオプション(%%%O%%%)」をクリックしコンパイルオプションダイアログボックを開く。上端の設定リストをドロップダウンし「Windows x86 (Console)」を選んで''OK''ボタンを押すと、32ビット版のコンパイル条件の設定は全て完了する。+コンパイルしたいソースコードを開いている状態で、「ツール(%%%T%%%)」メニュー内の「コンパイルオプション(%%%O%%%)」をクリックしコンパイルオプションダイアログボックを開く。上端の設定リストをドロップダウンし「Windows x64 (Console)」を選んで''OK''ボタンを押すと、32ビット版のコンパイル条件の設定は全て完了する。
#ref(GDL_SelCompileOption.png) #ref(GDL_SelCompileOption.png)
- 
-ちなみに「Windows x64 (Console)」を選ぶと64ビット版の実行ファイルを生成するためのコンパイル条件となる。 
なお後述の[[ビルド>#w2cf95a0]]や[[デバッグ情報付きビルド>#hf8ec1c4]]を行った既存のソースコードファイルを開き直した場合、「保存済みの環境設定ファイルが見つかりました。」というメッセージダイアログボックスが表示される。 なお後述の[[ビルド>#w2cf95a0]]や[[デバッグ情報付きビルド>#hf8ec1c4]]を行った既存のソースコードファイルを開き直した場合、「保存済みの環境設定ファイルが見つかりました。」というメッセージダイアログボックスが表示される。
Line 414: Line 400:
-[[GCC Developer Liteをインストール>#gcd418f7]]し[[コンパイルオプションを設定>#l2754b74]] -[[GCC Developer Liteをインストール>#gcd418f7]]し[[コンパイルオプションを設定>#l2754b74]]
-ソースコード中のCOMポート番号を自身の環境に合わせて修正 -ソースコード中のCOMポート番号を自身の環境に合わせて修正
--ソースコードファイルは拡張子が「.c」になるようにDX2LIBの4つのファイルを抽出した場所に保存+-ソースコードファイルの拡張子は「.c」となるようにし、先のDX2LIBの4つのファイルを抽出した場所に保存
である。 である。
Line 443: Line 429:
#include "dx2lib.h"  // DX2LIBのヘッダ #include "dx2lib.h"  // DX2LIBのヘッダ
-void main (void) {+int main (void) {
  // DLLの読み込み(成功は必須)   // DLLの読み込み(成功は必須)
  if (LoadDLL ()) {   if (LoadDLL ()) {
Line 480: Line 466:
#html{{ #html{{
 +<style type="text/css">
 +    .syntaxhighlighter {
 +     overflow-y: auto !important;
 +     overflow-x: auto !important;
 +     max-height: 900px;
 +     -webkit-text-size-adjust: 100%;
 +    }
 +</style>
<pre class="brush: c;"> <pre class="brush: c;">
// ID番号を決め打ちする場合 // ID番号を決め打ちする場合
Line 486: Line 480:
#include "dx2lib.h" #include "dx2lib.h"
-void main (void) {+int main (void) {
  // DLLの読み込み(成功は必須)   // DLLの読み込み(成功は必須)
  if (LoadDLL ()) {   if (LoadDLL ()) {
Line 526: Line 520:
#include "dx2lib.h" #include "dx2lib.h"
-void main (void) {+int main (void) {
  if (LoadDLL ()) {   if (LoadDLL ()) {
   uint8_t IDs[253];  // 見つかったIDのリスト    uint8_t IDs[253];  // 見つかったIDのリスト
Line 569: Line 563:
#include "dx2lib.h" #include "dx2lib.h"
-void main (void) {+int main (void) {
  if (LoadDLL ()) {   if (LoadDLL ()) {
   uint8_t IDs[253];  // 見つかったIDのリスト    uint8_t IDs[253];  // 見つかったIDのリスト
Line 620: Line 614:
} }
-void main (void) {+int main (void) {
  if (LoadDLL ()) {   if (LoadDLL ()) {
   uint8_t IDs[253];  // 見つかったIDのリスト    uint8_t IDs[253];  // 見つかったIDのリスト
Line 676: Line 670:
#include "dx2lib.h" #include "dx2lib.h"
-void main (void) {+int main (void) {
  if (LoadDLL ()) {   if (LoadDLL ()) {
   uint8_t IDs[253];  // 見つかったIDのリスト    uint8_t IDs[253];  // 見つかったIDのリスト
Line 720: Line 714:
#include "dx2lib.h" #include "dx2lib.h"
-void main (void) {+int main (void) {
  if (LoadDLL ()) {   if (LoadDLL ()) {
   uint8_t IDs[253];  // 見つかったIDのリスト    uint8_t IDs[253];  // 見つかったIDのリスト
Line 772: Line 766:
#include "dx2lib.h" #include "dx2lib.h"
-void main (void) {+int main (void) {
  if (LoadDLL ()) {   if (LoadDLL ()) {
   uint8_t IDs[253];  // 見つかったIDのリスト    uint8_t IDs[253];  // 見つかったIDのリスト
Line 863: Line 857:
} }
-void main (void) {+int main (void) {
  if (LoadDLL ()) {   if (LoadDLL ()) {
   TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000);    TDeviceID dev = DX2_OpenPort ("\\\\.\\COM3", 1000000);
Line 949: Line 943:
} }
-void main (void) {+int main (void) {
  pthread_t th1, th2;   pthread_t th1, th2;
Line 994: Line 988:
またあえて諸々の通信速度を9600bpsに変更した場合も、最終的な動作にどの程度影響するかを試しても面白い。~ またあえて諸々の通信速度を9600bpsに変更した場合も、最終的な動作にどの程度影響するかを試しても面白い。~
その結果、Windows上でいわゆるリアルタイム性を担保するのは一筋縄ではいかない事が分かってもらえれば良いかと。 その結果、Windows上でいわゆるリアルタイム性を担保するのは一筋縄ではいかない事が分かってもらえれば良いかと。
 +
***他の言語 [#d5eca280] ***他の言語 [#d5eca280]
-既に[[DX2LIBのページ>DX2LIB#u3ff03d9]]に紹介されている通りだが、基本的にC言語上で動作させることが前提となっているために扱いづらい事が多々ある。それでも使えない事も無いので、やけに流行っている[[Python>https://www.python.jp/]]で遊んでみても面白いかと思う。~ +****Pythonその1 [#t51150f3] 
-紹介したGCC Developer LiteにはPython3が含まれているので、コンパイルオプションで「Python(32bit)」を選んでライブラリのサンプルソースコードを適宜修正して是非実行してみて欲しい。+[[DX2LIBのページ>DX2LIB#u3ff03d9]]に紹介されている通りだが、その中でも流行の[[Python>https://www.python.jp/]]で遊んでみても面白いかと思う。~ 
 +GCC Developer LiteにはPythonが含まれており、コンパイルオプションで「Python(32bit)」や「Python(64bit)」を選ぶ事でライブラリに同梱されるPythonのサンプルソースコードが利用できる。しかし追加APIを利用したものばかりで、コントロールテーブルを読み書きする基本的なAPIは使われていない。 
 + 
 +ここでは基本的なAPIのみを使った例を紹介するが、dx2lib.pyはC言語で作成されたライブラリをPythonで利用するためだけの記述がなされているため、Pythonならではの柔軟なコードがctypesによって変に厳格化されて使い勝手が悪い。~ 
 +ひとまずDynamixel Xシリーズを前提とし、主要なアイテムのアドレスとコントロールテーブルを宣言してみた。 
 +#html{{ 
 +<pre class="brush: python;"> 
 +from ctypes import * 
 + 
 +ADDRESS_X_MODEL_NUMBER          = 0  #W 
 +ADDRESS_X_MODEL_INFORMATION    = 4  #L 
 +ADDRESS_X_VERSION_FW            = 6  #B 
 +ADDRESS_X_ID                    = 7  #B 
 +ADDRESS_X_BAUDRATE              = 8  #B 
 +ADDRESS_X_RETURN_DELAY_TIME    = 9  #B 
 +ADDRESS_X_DRIVE_MODE            = 10  #B 
 +ADDRESS_X_OPERATIING_MODE      = 11  #B 
 +ADDRESS_X_HOMING_OFFSET        = 20  #L 
 +ADDRESS_X_MAX_POSITION_LIMIT    = 48  #L 
 +ADDRESS_X_MIN_POSITION_LIMIT    = 52  #L 
 +ADDRESS_X_TORQUE_ENABLE        = 64  #B 
 +ADDRESS_X_LED_RED              = 65  #B 
 +ADDRESS_X_STATUS_RETURN_LEVEL  = 69  #B 
 +ADDRESS_X_HARDWARE_ERROR_STATUS = 70  #B 
 +ADDRESS_X_VELOCITY_IGAIN        = 76  #W 
 +ADDRESS_X_VELOCITY_PGAIN        = 78  #W 
 +ADDRESS_X_POSITION_DGAIN        = 80  #W 
 +ADDRESS_X_POSITION_IGAIN        = 82  #W 
 +ADDRESS_X_POSITION_PGAIN        = 84  #W 
 +ADDRESS_X_GOAL_PWM              = 100 #W 
 +ADDRESS_X_GOAL_CURRENT          = 102 #W 
 +ADDRESS_X_GOAL_VELOCITY        = 104 #L 
 +ADDRESS_X_PROF_ACCELERATION    = 108 #L 
 +ADDRESS_X_PROF_VELOCITY        = 112 #L 
 +ADDRESS_X_GOAL_POSITION        = 116 #L 
 +ADDRESS_X_MOVING                = 122 #B 
 +ADDRESS_X_MOVING_STATUS        = 123 #B 
 +ADDRESS_X_PRESENT_PWM          = 124 #W 
 +ADDRESS_X_PRESENT_CURRENT      = 126 #W 
 +ADDRESS_X_PRESENT_VELOCITY      = 128 #L 
 +ADDRESS_X_PRESENT_POSITION      = 132 #L 
 +ADDRESS_X_VELOCITY_TRAJECTORY  = 136 #L 
 +ADDRESS_X_POSITION_TRAJECTORY  = 140 #L 
 +ADDRESS_X_PRESENT_VOLTAGE      = 144 #W 
 +ADDRESS_X_PRESENT_TEMP          = 146 #B 
 + 
 +class XCtrlTable(Structure): 
 +  _pack_ = 1 
 +  _fields_ = [ 
 +   ('ModelNumber',                c_uint16), 
 +   ('ModelInformation',            c_uint32), 
 +   ('VersionofFirmware',          c_uint8), 
 +   ('ID',                          c_uint8), 
 +   ('Baudrate',                    c_uint8), 
 +   ('ReturnDelayTime',            c_uint8), 
 +   ('DriveMode',                  c_uint8), 
 +   ('OperatingMode',              c_uint8), 
 +   ('SecondaryID',                c_uint8), 
 +   ('ProtocolVersion',            c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('HomingOffset',                c_int32), 
 +   ('MovingThreshold',            c_uint32), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('TemperatureLimit',            c_uint8), 
 +   ('MaxVoltageLimit',            c_uint16), 
 +   ('MinVoltageLimit',            c_uint16), 
 +   ('PWMLimit',                    c_uint16), 
 +   ('CurrentLimit',                c_uint16), 
 +   ('AccelerationLimit',          c_uint32), 
 +   ('VelocityLimit',              c_uint32), 
 +   ('MaxPositionLimit',            c_uint32), 
 +   ('MinPositionLimit',            c_uint32), 
 +   ('ExternalPortMode1',          c_uint8), 
 +   ('ExternalPortMode2',          c_uint8), 
 +   ('ExternalPortMode3',          c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('Shutdown',                    c_uint8), 
 +   ('TorqueEnable',                c_uint8), 
 +   ('LED',                        c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('StatusReturnLevel',          c_uint8), 
 +   ('RegisteredInstruction',      c_uint8), 
 +   ('HardwareErrorStatus',        c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('VelocityIGain',              c_uint16), 
 +   ('VelocityPGain',              c_uint16), 
 +   ('PositionDGain',              c_uint16), 
 +   ('PositionIGain',              c_uint16), 
 +   ('PositionPGain',              c_uint16), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('FeedforwardAccelerationGain', c_uint16), 
 +   ('FeedforwardVelocityGain, ',  c_uint16), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('BusWatchdog',                c_int8), 
 +   ('reserve',                    c_uint8), 
 +   ('GoalPWM',                    c_int16), 
 +   ('GoalCurrent',                c_int16), 
 +   ('GoalVelocity',                c_int32), 
 +   ('ProfileAcceleration',        c_uint32), 
 +   ('ProfileVelocity',            c_uint32), 
 +   ('GoalPosition',                c_int32), 
 +   ('RealtimeTick',                c_uint16), 
 +   ('Moving',                      c_uint8), 
 +   ('MovingStatus',                c_uint8), 
 +   ('PresentPWM',                  c_int16), 
 +   ('PresentCurrent',              c_int16), 
 +   ('PresentVelocity',            c_int32), 
 +   ('PresentPosition',            c_int32), 
 +   ('VelocityTrajectory',          c_uint32), 
 +   ('PositionTrajectory',          c_uint32), 
 +   ('PresentInputVoltage',        c_uint16), 
 +   ('PresentTemperature',          c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('reserve',                    c_uint8), 
 +   ('ExternalPortData1',          c_uint16), 
 +   ('ExternalPortData2',          c_uint16), 
 +   ('ExternalPortData3',          c_uint16)] 
 + 
 +class VelocityGain(Structure): 
 +  _pack_ = 1 
 +  _fields_ = [ 
 +   ('VelocityIGain',              c_uint16), 
 +   ('VelocityPGain',              c_uint16)] 
 + 
 +class PositionGain(Structure): 
 +  _pack_ = 1 
 +  _fields_ = [ 
 +   ('PositionDGain',              c_uint16), 
 +   ('PositionIGain',              c_uint16), 
 +   ('PositionPGain',              c_uint16)] 
 +</pre> 
 +}} 
 +例えばPresent PositionをID=1のDynamixelのコントロールテーブルから読み出すには、以下のコードで対応できる。 
 +#html{{ 
 +<pre class="brush: python;"> 
 +from dx2lib import * 
 + 
 +dev = DX2_OpenPort (b'\\\\.\\COM3', 57600) 
 +if dev != None: 
 +  ppos = c_int32() 
 +  err = c_uint16() 
 +  DX2_ReadLongData (dev, 1, ADDRESS_X_PRESENT_POSITION, cast(byref(ppos), POINTER(c_uint32)), err) 
 +  DX2_ClosePort (dev) 
 +</pre> 
 +}} 
 +更にXCtrlTable構造体を用いて変数を定義し、その変数へID=1のDynamixelのコントロールテーブルから一括読み出した値を保存する場合はDX2_ReadBlockDataを用いる。 
 +#html{{ 
 +<pre class="brush: python;"> 
 +from dx2lib import * 
 + 
 +dev = DX2_OpenPort (b'\\\\.\\COM3', 57600) 
 +if dev != None: 
 +  tbl = XCtrlTable() 
 +  err = c_uint16() 
 +  if DX2_ReadBlockData (dev, 1, ADDRESS_X_MODEL_NUMBER, cast(byref(tbl), POINTER(c_uint8)), sizeof(tbl), err): 
 +   for field in tbl._fields_: 
 +     if field[0] != 'reserve': 
 +       print (tbl.__class__.__dict__[field[0]].offset​, field[0], '=', getattr(tbl, field[0])) 
 +  DX2_ClosePort (dev) 
 +</pre> 
 +}} 
 +位置決め制御時のPIDゲインを設定する等の連続したアドレスに割り当てられた複数のアイテムを同時に書き換える場合はDX2_WriteBlockDataを用いる。 
 +#html{{ 
 +<pre class="brush: python;"> 
 +from dx2lib import * 
 + 
 +dev = DX2_OpenPort (b'\\\\.\\COM3', 57600) 
 +if dev != None: 
 +  gain = PositionGain(0, 0, 400) 
 +  DX2_WriteBlockData (dev, 1, ADDRESS_X_POSITION_DGAIN, cast(byref(gain), POINTER(c_uint8)), sizeof (gain), None) 
 +  DX2_ClosePort (dev) 
 +</pre> 
 +}} 
 +追加APIに無い機能は基本APIを使ってコントロールテーブルへ直接アクセスするしかないため、参考にしてもらえればと思う。 
 + 
 +****Pythonその2 [#k91ab321] 
 +全てPythonで書いてしまえば、態々Cで作られたライブラリをリンクしたり、ctypesの有象無象に呵まれる事も無くスッキリする。という事でインストラクションパケットの生成と送信、ステータスパケットの受信、インストラクションの種類によってパラメータ部分を結合・分解するクラスを作ってみた。前記の内容とは一線を画すので、互換性は一切考えていない。 
 +[[pySerial>https://github.com/pyserial/p​yserial]]は必須。~ 
 +なお単体で試せるようにデモコードを付けておいた。ターゲットはXM/XH/XDシリーズ、IDは1~5(少なくともID=1は必須)、ボーレートは3Mbpsを想定、デバッグ用にpsutilが必要、マルチスレッドでアクセスといったところだ。プロトコルV1に対応したDynamixelの場合は末尾のコメントになっているコードを利用してもらえればと。~ 
 +一応関数に与えられたパラメータを確認しているが、全領域で検証したものではない。また値の正負については勝手に判断している部分もあるので、都合が悪ければ適宜修正してもらえればと思う。 
 + 
 +[[ソースへの直リンク>https://github.com/mukyokyo/pyDXL/raw/m​ain/pyDXL.py]] 
 + 
 +#html{{ 
 +<pre class="brush: python;"  title="pyDXL.py"> 
 +#!/usr/bin/python3 
 +# -*- coding: utf-8 -*- 
 +
 +# pyDXL.py 
 +# SPDX-License-Identifier: MIT 
 +# SPDX-FileCopyrightText: (C) 2024 mukyokyo 
 + 
 +import serial, threading, multiprocessing, array, struct 
 +from typing import Union 
 +from collections import namedtuple 
 +from struct import pack, unpack, iter_unpack 
 + 
 +########################################​################## 
 +# Functionalized the part of converting int to bytes. 
 +# If specified as a tuple, it is converted to bytes at once. 
 +########################################​################## 
 +def B2Bs(d) -> bytes: 
 +  if isinstance(d, list) or isinstance(d, tuple): 
 +   return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d) 
 +  else: 
 +   return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d)) 
 + 
 +def W2Bs(d) -> bytes: 
 +  if isinstance(d, list) or isinstance(d, tuple): 
 +   return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]]) 
 +  else: 
 +   return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d)) 
 + 
 +def L2Bs(d) -> bytes: 
 +  if isinstance(d, list) or isinstance(d, tuple): 
 +   return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]]) 
 +  else: 
 +   return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d)) 
 + 
 +########################################​################## 
 +# API for Dynamixel protocol V1 
 +########################################​################## 
 +class DXLProtocolV1: 
 +  BROADCASTING_ID = 0xfe 
 +  INST_PING          = 0x01 
 +  INST_READ          = 0x02 
 +  INST_WRITE          = 0x03 
 +  INST_REG_WRITE      = 0x04 
 +  INST_ACTION        = 0x05 
 +  INST_FACTORY_RESET  = 0x06 
 +  INST_REBOOT        = 0x08 
 +  INST_SYNC_WRITE    = 0x83 
 +  INST_SYNG_REG_WRITE = 0x85 
 + 
 +  TSyncW = namedtuple("TSyncW", ("id", ("data"))) 
 + 
 +  def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None): 
 +   """ 
 +   Initalize 
 + 
 +   parameters 
 +   ------------- 
 +   port : str 
 +     Device name 
 +   baudrate : int 
 +     Serial baudrate[bps] 
 +   timeout : float 
 +     Read timeout[s] 
 +   """ 
 +   if isinstance(port, serial.Serial): 
 +     self.__serial = port 
 +     self.__baudrate = port.baudrate 
 +     self.__timeout = port.timeout 
 +   else: 
 +     self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout) 
 +     self.__baudrate = self.__serial.baudrate 
 +     self.__timeout = self.__serial.timeout 
 +   if lock == None: 
 +     self.__lock = threading.Lock() 
 +   else: 
 +     self.__lock = lock 
 +   self.__Error = 0 
 + 
 +  @property 
 +  def lock(self): 
 +   return self.__lock 
 + 
 +  @property 
 +  def baudrate(self): 
 +   return self.__serial.baudrate 
 + 
 +  @baudrate.setter 
 +  def baudrate(self, baudrate): 
 +   self.__baudrate = baudrate 
 +   self.__serial.baudrate = baudrate 
 + 
 +  @property 
 +  def timeout(self): 
 +   return self.__serial.timeout 
 + 
 +  @timeout.setter 
 +  def timeout(self, timeout): 
 +   self.__timeout = timeout 
 +   self.__serial.timeout = timeout 
 + 
 +  def __reconfig(self): 
 +   self.__serial.baudrate = self.__baudrate 
 +   self.__serial.timeout = self.__timeout 
 + 
 +  @property 
 +  def Error(self): 
 +   return self.__Error 
 + 
 +  def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool): 
 +   """ 
 +   Sending packets 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   inst : int 
 +     Instruction command 
 +   param : bytes 
 +     Packet parameters 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Packets sent 
 +   bool 
 +     Success or failure 
 +   """ 
 +   self.__reconfig() 
 +   if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 253)) and len(param) <= (256 - 6): 
 +     instp = bytearray([0xff,0xff,id,0,inst]) + bytes(param) 
 +     instp[3] = len(instp) - 3 
 +     instp += B2Bs(~sum(instp[2:]) & 0xff) 
 +     self.__serial.reset_input_buffer() 
 +     if echo: print('TX:', instp.hex(':')) 
 +     self.__serial.write(instp) 
 +     return bytes(instp), True 
 +   return None, False 
 + 
 +  def __rx(self, length) -> bytes: 
 +   s = self.__serial.read(length) 
 +   l = len(s) 
 +   if l == length: 
 +     return s 
 +   else: 
 +     r = s 
 +     length -= l 
 +     if length > 0: 
 +       while self.__serial.in_waiting > 0: 
 +         s = self.__serial.read(length) 
 +         r += s 
 +         length -= len(s) 
 +         if length == 0: 
 +           break 
 +     return r 
 + 
 +  def RxPacket(self, echo = False) -> (bytes, bool): 
 +   """ 
 +   Receiving packets 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Packets received 
 +   bool 
 +     Success or failure 
 +   """ 
 +   statp = self.__rx(5) 
 +   if statp: 
 +     if len(statp) == 5: 
 +       if statp[0] == 0xff and statp[1] == 0xff: 
 +         l = statp[3] - 1 
 +         self.__Error = statp[4] 
 +         statp += self.__rx(l) 
 +         if len(statp) == l + 5: 
 +           if statp[-1:][0] == ((~sum(statp[2:-1])) & 0xff): 
 +             if echo: print('RX:', statp.hex(':')) 
 +             return bytes(statp), (statp[4] & 0x40) == 0 
 +   return None, False 
 + 
 +  def Write(self, id : int, addr : int, data : bytes, echo = False) -> bool: 
 +   """ 
 +   Write instruction 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   addr : int 
 +     Target item address 
 +   data : bytes 
 +     Data to be written 
 + 
 +   Returns 
 +   ------- 
 +   result : bool 
 +     Success or failure 
 +   """ 
 +   with self.__lock: 
 +     if id >= 0 and id <= self.BROADCASTING_ID and addr >= 0  and addr <= 254: 
 +       if self.TxPacket(id, self.INST_WRITE, B2Bs(addr) + data, echo)[1]: 
 +         if id != self.BROADCASTING_ID: 
 +           dat, r = self.RxPacket(echo) 
 +           if r: 
 +             return dat[2] == id and (dat[4] & 0x18) == 0 
 +         else: 
 +           return True 
 +     return False 
 + 
 +  def Write8(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: 
 +   return self.Write(id, addr, B2Bs(data), echo) 
 + 
 +  def Write16(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: 
 +   return self.Write(id, addr, W2Bs(data), echo) 
 + 
 +  def Write32(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: 
 +   return self.Write(id, addr, L2Bs(data), echo) 
 + 
 +  def Read(self, id : int, addr : int, length : int, echo = False) -> bytes: 
 +   """ 
 +   Read instruction 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   addr : int 
 +     Target item address 
 +   length : int 
 +     Number of bytes to read 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Data read 
 +   """ 
 +   with self.__lock: 
 +     if id >= 0 and id <= 253 and addr >= 0 and addr <= 254 and length > 0 and length <= (256 - 6): 
 +       if self.TxPacket(id, self.INST_READ, B2Bs((addr, length)), echo)[1]: 
 +         dat, r = self.RxPacket(echo) 
 +         if r: 
 +           if dat[2] == id and (dat[4] & 0x8) == 0: 
 +             return bytes(dat[5:-1]) 
 +     return None 
 + 
 +  def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, length, echo) 
 +   if r != None: 
 +     n = sum(iter_unpack('b' if signed else 'B', r), ()) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, 2 << (length - 1), echo) 
 +   if r != None: 
 +     n = sum(iter_unpack('h' if signed else 'H', r), ()) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, 4 << (length - 1), echo) 
 +   if r != None: 
 +     n = sum(iter_unpack('i' if signed else 'I', r), ()) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool: 
 +   """ 
 +   Sync Write instruction 
 + 
 +   parameters 
 +   ------------- 
 +   addr : int 
 +     Target item address 
 +   length : int 
 +     Number of bytes to write 
 +   id_datas : (TSyncW) 
 +     Target ID and data 
 + 
 +   Returns 
 +   ------- 
 +   bool 
 +     Success or failure 
 +   """ 
 +   with self.__lock: 
 +     if addr >= 0 and addr <= 254 and length > 0 and length < (256 - 6): 
 +       param = B2Bs((addr,length)) 
 +       for d in id_datas: 
 +         param += B2Bs(d.id) + d.data 
 +         if len(d.data) != length or d.id < 0 or d.id > 253: 
 +           del param 
 +           return False 
 +       return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1] 
 +     return False 
 + 
 +  def Ping(self, id : int, echo = False) -> bool: 
 +   with self.__lock: 
 +     if self.TxPacket(id, self.INST_PING, bytes(), echo)[1]: 
 +       dat, r = self.RxPacket(echo) 
 +       if r: 
 +         return id == dat[2] and dat[3] == 2 
 +     return False 
 + 
 +  def FactoryReset(self, id : int, echo = False) -> bool: 
 +   with self.__lock: 
 +     if self.TxPacket(id, self.INST_FACTORY_RESET, bytes(), echo)[1]: 
 +       dat, r = self.RxPacket(echo) 
 +       if r: 
 +         return id == dat[2] and dat[3] == 2 
 +     return False 
 + 
 +  def Reboot(self, id : int, echo = False) -> bool: 
 +   with self.__lock: 
 +     if self.TxPacket(id, self.INST_REBOOT, bytes(), echo)[1]: 
 +       dat, r = self.RxPacket(echo) 
 +       if r: 
 +         return id == dat[2] and dat[3] == 2 
 +     return False 
 + 
 +########################################​################## 
 +# API for Dynamixel protocol V2 
 +########################################​################## 
 +class DXLProtocolV2: 
 +  BROADCASTING_ID = 0xfe 
 +  INST_PING                = 0x01 
 +  INST_READ                = 0x02 
 +  INST_WRITE                = 0x03 
 +  INST_REG_WRITE            = 0x04 
 +  INST_ACTION              = 0x05 
 +  INST_FACTORY_RESET        = 0x06 
 +  INST_REBOOT              = 0x08 
 +  INST_SYS_WRITE            = 0x0d 
 +  INST_CLEAR                = 0x10 
 +  INST_CONTROL_TABLE_BACKUP = 0x20 
 +  INST_STATUS              = 0x55 
 +  INST_SYNC_READ            = 0x82 
 +  INST_SYNC_WRITE          = 0x83 
 +  INST_SYNG_REG_WRITE      = 0x85 
 +  INST_FAST_SYNC_READ      = 0x8a 
 +  INST_BULK_READ            = 0x92 
 +  INST_BULK_WRITE          = 0x93 
 +  INST_FAST_BULK_READ      = 0x9a 
 + 
 +  TSyncW = namedtuple("TSyncW", ("id", ("data"))) 
 +  TBulkW = namedtuple("TBulkW", ("id", "addr", ("data"))) 
 +  TBulkR = namedtuple("TBulkR", ("id", "addr", "length")) 
 + 
 +  __crc16_lutable = array.array('H') 
 + 
 +  def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None): 
 +   """ 
 +   Initalize 
 + 
 +   parameters 
 +   ------------- 
 +   port : str 
 +     Device name 
 +   baudrate : int 
 +     Serial baudrate[bps] 
 +   timeout : float 
 +     Read timeout[s] 
 +   """ 
 +   if isinstance(port, serial.Serial): 
 +     self.__serial = port 
 +     self.__baudrate = port.baudrate 
 +     self.__timeout = port.timeout 
 +   else: 
 +     self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout) 
 +     self.__baudrate = self.__serial.baudrate 
 +     self.__timeout = self.__serial.timeout 
 +   if lock == None: 
 +     self.__lock = threading.Lock() 
 +   else: 
 +     self.__lock = lock 
 +   self.__Error = 0 
 +   poly = 0x8005 
 +   for i in range(256): 
 +     nData = i << 8 
 +     nAccum = 0 
 +     for j in range(8): 
 +       nAccum = ((nAccum << 1) ^ poly if (nData ^ nAccum) & 0x8000 else nAccum << 1) & 0xffff 
 +       nData <<= 1 
 +     self.__crc16_lutable.append(nAccum) 
 + 
 +  @property 
 +  def lock(self): 
 +   return self.__lock 
 + 
 +  @property 
 +  def baudrate(self): 
 +   return self.__serial.baudrate 
 + 
 +  @baudrate.setter 
 +  def baudrate(self, baudrate): 
 +   self.__baudrate = baudrate 
 +   self.__serial.baudrate = baudrate 
 + 
 +  @property 
 +  def timeout(self): 
 +   return self.__serial.timeout 
 + 
 +  @timeout.setter 
 +  def timeout(self, timeout): 
 +   self.__timeout = timeout 
 +   self.__serial.timeout = timeout 
 + 
 +  def __reconfig(self): 
 +   self.__serial.baudrate = self.__baudrate 
 +   self.__serial.timeout = self.__timeout 
 + 
 +  @property 
 +  def Error(self): 
 +   return self.__Error 
 + 
 +  def __crc16(self, data : bytes) -> int: 
 +   crc = 0 
 +   for d in data: 
 +     crc = (crc << 8) ^ self.__crc16_lutable[(((crc >> 8) ^ d) & 0xff)] 
 +   return crc & 0xffff 
 + 
 +  def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool): 
 +   """ 
 +   Sending packets 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   inst : int 
 +     Instruction command 
 +   param : bytes 
 +     Packet parameters 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Packets sent 
 +   bool 
 +     Success or failure 
 +   """ 
 +   self.__reconfig() 
 +   if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 252)) and len(param) < (65536 - 10): 
 +     instp = bytearray([0xff,0xff,0xfd,0x00,id,0,0,in​st]) + bytearray(param).replace(b'\xff\xff\xfd'​,b'\xff\xff\xfd\xfd') 
 +     instp[5:7] = W2Bs(len(instp) - 5) 
 +     instp += W2Bs(self.__crc16(instp)) 
 +     self.__serial.reset_input_buffer() 
 +     if echo: print('TX:', instp.hex(':')) 
 +     self.__serial.write(instp) 
 +     return bytes(instp), True 
 +   return None, False 
 + 
 +  def __rx(self, length) -> bytes: 
 +   s = self.__serial.read(length) 
 +   l = len(s) 
 +   if l == length: 
 +     return s 
 +   else: 
 +     r = s 
 +     length -= l 
 +     if length > 0: 
 +       while self.__serial.in_waiting > 0: 
 +         s = self.__serial.read(length) 
 +         r += s 
 +         length -= len(s) 
 +         if length == 0: 
 +           break 
 +     return r 
 + 
 +  def RxPacket(self, echo = False) -> (bytes, bool): 
 +   """ 
 +   Receiving packets 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Packets received 
 +   bool 
 +     Success or failure 
 +   """ 
 +   self.__serial.flush() 
 +   statp = self.__rx(9) 
 +   if statp: 
 +     if len(statp) == 9: 
 +       if statp[0] == 0xff and statp[1] == 0xff and statp[2] == 0xfd and statp[3] == 0 and statp[7] == 0x55: 
 +         l = unpack('<H', statp[5:7])[0] - 2 
 +         self.__Error = statp[8] 
 +         statp += self.__rx(l) 
 +         if len(statp) == l + 9: 
 +           if unpack('<H', statp[-2:])[0] == self.__crc16(statp[:-2]): 
 +             statp = statp[0:9] + statp[9:].replace(b'\xff\xff\xfd\xfd',b'​\xff\xff\xfd') 
 +             if echo: print('RX:', statp.hex(':')) 
 +             return bytes(statp), (statp[8] & 0x7f) == 0 
 +   return None, False 
 + 
 +  def Write(self, id : int, addr : int, data : bytes, echo = False) -> bool: 
 +   """ 
 +   Write instruction 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   addr : int 
 +     Target item address 
 +   data : bytes 
 +     Data to be written 
 + 
 +   Returns 
 +   ------- 
 +   result : bool 
 +     Success or failure 
 +   """ 
 +   with self.__lock: 
 +     if ((id >= 0 and id <= 252) or (id == self.BROADCASTING_ID)) and addr >= 0 and addr <= 65535: 
 +       if self.TxPacket(id, self.INST_WRITE, W2Bs(addr) + data, echo)[1]: 
 +         if id != self.BROADCASTING_ID: 
 +           dat, r = self.RxPacket(echo) 
 +           if r: 
 +             return dat[4] == id and (dat[8] & 0x7f) == 0 
 +         else: 
 +           return True 
 +     return False 
 + 
 +  def Write8(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: 
 +   return self.Write(id, addr, B2Bs(data), echo) 
 + 
 +  def Write16(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: 
 +   return self.Write(id, addr, W2Bs(data), echo) 
 + 
 +  def Write32(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: 
 +   return self.Write(id, addr, L2Bs(data), echo) 
 + 
 +  def Read(self, id : int, addr : int, length : int, echo = False) -> bytes: 
 +   """ 
 +   Read instruction 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   addr : int 
 +     Target item address 
 +   length : int 
 +     Number of bytes to read 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Data read 
 +   bool 
 +     Success or failure 
 +   """ 
 +   with self.__lock: 
 +     if id >= 0 and id <= 252 and addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10): 
 +       if self.TxPacket(id, self.INST_READ, W2Bs(addr) + W2Bs(length), echo)[1]: 
 +         dat, r = self.RxPacket(echo) 
 +         if r: 
 +           return bytes(dat[9:-2]) 
 +     return None 
 + 
 +  def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, length, echo) 
 +   if r != None: 
 +     n = sum(iter_unpack('b' if signed else 'B', r), ()) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, 2 << (length - 1), echo) 
 +   if r != None: 
 +     n = sum(iter_unpack('h' if signed else 'H', r), ()) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, 4 << (length - 1), echo) 
 +   if r != None: 
 +     n = sum(iter_unpack('i' if signed else 'I', r), ()) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool: 
 +   """ 
 +   Sync Write instruction 
 + 
 +   parameters 
 +   ------------- 
 +   addr : int 
 +     Target item address 
 +   length : int 
 +     Number of bytes to write 
 +   id_datas : (TSyncW) 
 +     Target ID and data 
 + 
 +   Returns 
 +   ------- 
 +   bool 
 +     Success or failure 
 +   """ 
 +   with self.__lock: 
 +     if addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10): 
 +       param = W2Bs(addr) + W2Bs(length) 
 +       for d in id_datas: 
 +         param += bytes((d.id,)) + d.data 
 +         if len(d.data) != length or d.id < 0 or d.id > 252: 
 +           del param 
 +           return False 
 +       return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1] 
 +     return False 
 + 
 +  def SyncRead(self, addr : int, length : int, ids : (int), echo = False) -> tuple: 
 +   """ 
 +   Sync Read instruction 
 + 
 +   parameters 
 +   ------------- 
 +   addr : int 
 +     Target item address 
 +   length : int 
 +     Number of bytes to read 
 +   ids : (int) 
 +     Target IDs 
 + 
 +   Returns 
 +   ------- 
 +   tuple 
 +     Read ID and data 
 +   """ 
 +   with self.__lock: 
 +     result = () 
 +     if addr >= 0 and addr < 65535 and length > 0 and length < (65536 - 10): 
 +       if self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_READ, W2Bs(addr) + W2Bs(length) + B2Bs(ids), echo)[1]: 
 +         for id in ids: 
 +           dat, r = self.RxPacket(echo) 
 +           if r: 
 +             if dat[4] == id: 
 +               result += (id, bytes(dat[9:9 + length])), 
 +             else: 
 +               result += (id, bytes([])), 
 +           else: 
 +             result += (id, bytes([])), 
 +     return result 
 + 
 +  def BulkWrite(self, data : (TBulkW), echo = False) -> bool: 
 +   """ 
 +   Bulk Write instruction 
 + 
 +   parameters 
 +   ------------- 
 +   data : (TBulkW) 
 +     Target ID and address, data 
 +   ids : (TSyncW) 
 +     Target ID and data 
 + 
 +   Returns 
 +   ------- 
 +   bool 
 +     Success or failure 
 +   """ 
 +   with self.__lock: 
 +     param = bytes() 
 +     for d in data: 
 +       if d.id >= 0 and d.id <= 252 and d.addr >= 0 and d.addr <= 65535: 
 +         param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(len(d.data)) + d.data 
 +       else: 
 +         del param 
 +         return False 
 +     return self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_WRITE, param, echo)[1] 
 + 
 +  def BulkRead(self, data:(TBulkR), echo = False) -> tuple: 
 +   """ 
 +   Bulk Read instruction 
 + 
 +   parameters 
 +   ------------- 
 +   data : (TBulkR) 
 +     ID, address, and number of bytes to be read 
 + 
 +   Returns 
 +   ------- 
 +   tuple 
 +     Read ID and data 
 +   """ 
 +   with self.__lock: 
 +     result = () 
 +     param = bytes() 
 +     for d in data: 
 +       if d.id < 0 or d.addr < 0 or d.length < 0: 
 +         return result 
 +       param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(d.length) 
 +     if self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_READ, param, echo)[1]: 
 +       for d in data: 
 +         rxd, r = self.RxPacket(echo) 
 +         if r: 
 +           if(d.id == rxd[4]): 
 +             result += (d.id, bytes(rxd[9:-2])), 
 +           else: 
 +             result += (d.id, bytes([])), 
 +         else: 
 +           result += (d.id, bytes([])), 
 +     del param 
 +     return result 
 + 
 +  def Ping(self, id : int, echo = False) -> bool: 
 +   with self.__lock: 
 +     if self.TxPacket(id, self.INST_PING, bytes(), echo)[1]: 
 +       rxd, r = self.RxPacket(echo) 
 +       if r: 
 +         return id == rxd[4] and rxd[5] == 7 and rxd[6] == 0 
 +     return False 
 + 
 +  def FactoryReset(self, id : int, p1 : int, echo = False) -> bool: 
 +   with self.__lock: 
 +     if self.TxPacket(id, self.INST_FACTORY_RESET, bytes((p1,)), echo)[1]: 
 +       rxd, r = self.RxPacket(echo) 
 +       if r: 
 +         return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0 
 +     return False 
 + 
 +  def Reboot(self, id : int, echo = False) -> bool: 
 +   with self.__lock: 
 +     if self.TxPacket(id, self.INST_REBOOT, bytes(), echo)[1]: 
 +       rxd, r = self.RxPacket(echo) 
 +       if r: 
 +         return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0 
 +     return False 
 + 
 +  def Clear(self, id : int, val, echo = False) -> bool: 
 +   with self.__lock: 
 +     if self.TxPacket(id, self.INST_CLEAR, B2Bs(1) + L2Bs(val), echo)[1]: 
 +       rxd, r = self.RxPacket(echo) 
 +       if r: 
 +         return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0 
 +     return False 
 + 
 +########################################​################## 
 +# test code 
 +########################################​################## 
 +if __name__ == "__main__": 
 +  from contextlib import contextmanager 
 +  from threading import Thread 
 +  import time, gc, psutil, os 
 + 
 +  ec = False 
 +  ID = 1 
 +  fin = False 
 + 
 +  @contextmanager 
 +  def stopwatch(): 
 +   start_time = time.time() 
 +   yield 
 +   #print('..proc time={:.1f}ms {}'.format((time.time() - start_time) * 1000, psutil.Process(os.getpid()).memory_info(​).rss)) 
 + 
 +  def func1(dx): 
 +   global ID, fin, ec 
 + 
 +   try: 
 +     ''' 
 +     """ reset dxl """ 
 +     with stopwatch(): 
 +       r = dx.FactoryReset(ID, 0xff, echo = ec) 
 +     print(f'FactoryReset({ID}): {r}') 
 +     time.sleep(0.5) 
 +     ''' 
 +  
 +     """ ping dxl """ 
 +     print(f'Ping({ID})=', dx.Ping(ID, echo = ec)) 
 + 
 +     """ reboot dxl """ 
 +     with stopwatch(): 
 +       r = dx.Reboot(ID, echo = ec) 
 +     print(f'Reboot({ID})={r}') 
 +     time.sleep(0.5) 
 + 
 +     """ basic packet proc """ 
 +     with stopwatch(): 
 +       with dx.mutex: 
 +         r0 = dx.TxPacket(ID, dx.INST_WRITE, (65, 0, 1)) 
 +         r1 = dx.RxPacket() 
 +     print(f'TxPacket({ID})={r0[1]}', r0[0].hex(':')) 
 +     if r1[0]: 
 +       print(f'RxPacket({ID})={r1[1]}', r1[0].hex(':')) 
 + 
 +     """ dump memory """ 
 +     l = 50 
 +     for addr in range(0, 700, l): 
 +       r = dx.Read(ID, addr, l, echo = ec) 
 +       print(f'Read({addr};{l})=',r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!') 
 + 
 +     """ read byte item """ 
 +     with stopwatch(): 
 +       r = dx.Read8(ID, 65, echo = ec) 
 +     print(f'Read8({ID})={r}') 
 + 
 +     """ sync read inst. """ 
 +     print('SyncRead=') 
 +     with stopwatch(): 
 +       d = dx.SyncRead(0, 4, (1, 2, 3, 4, 5), echo = ec) 
 +     for d in d: 
 +       print(f' ({d[0]}) 0,4 hex=', d[1].hex(':') if len(d[1]) > 0 else ()) 
 +     with stopwatch(): 
 +       d = dx.SyncRead(132, 4, (1, 2, 3, 4, 5), echo = ec) 
 +     for d in d: 
 +       print(f' ({d[0]}) 132,4 int32=', unpack('<i', pack('<I', unpack('<I', d[1])[0]))[0] if len(d[1]) > 0 else ()) 
 + 
 +     """ sync write inst. """ 
 +     for i in range(30): 
 +       with stopwatch(): 
 +         dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(1)), dx.TSyncW(2, B2Bs(1)), dx.TSyncW(3, B2Bs(1)), dx.TSyncW(4, B2Bs(1)), dx.TSyncW(5, B2Bs(1))), echo = ec) 
 +       time.sleep(0.05) 
 +       with stopwatch(): 
 +         dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(0)), dx.TSyncW(2, B2Bs(0)), dx.TSyncW(3, B2Bs(0)), dx.TSyncW(4, B2Bs(0)), dx.TSyncW(5, B2Bs(0))), echo = ec) 
 +       time.sleep(0.05) 
 + 
 +     """ set goal position """ 
 +     #torque off 
 +     if dx.Write8(ID, 64, 0, echo = ec): 
 +       #multi turn 
 +       if dx.Write8(ID, 11, 4, echo = ec): 
 +         #torque on 
 +         if dx.Write8(ID, 64, 1, echo = ec): 
 +           print(f'Write32/Read32({ID})') 
 +           for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)): 
 +             for i in range(10): 
 +               #set goal position 
 +               with stopwatch(): 
 +                 dx.Write32(ID, 116, gp, echo = ec) 
 +               #get present position 
 +               with stopwatch(): 
 +                 pp = dx.Read32(ID, 132, signed = True, echo = ec) 
 +               if pp != None: 
 +                 print(f' w={gp:6} r={pp:6} diff={gp-pp:5}', end='\r') 
 +               else: 
 +                 pass 
 +                 print('None                            ', end='\r') 
 +                 break 
 +               time.sleep(0.01) 
 +             else: 
 +               continue 
 +             pass 
 +             break 
 +           print('') 
 +           #torque off 
 +           with stopwatch(): 
 +             dx.Write8(ID, 64, 0, echo = ec) 
 +           #normal turn 
 +           with stopwatch(): 
 +             dx.Write8(ID, 11, 3, echo = ec) 
 + 
 +     """ block read/write (add/remove suffixes) """ 
 +     with stopwatch(): 
 +       r = dx.Read(ID, 120, 27, echo = ec) 
 +     if r: 
 +       print(f'Read({ID};128)=', tuple(iter_unpack('<HBBhhiiIIHB', r))[0]) 
 + 
 +     fffffd = bytes(( 
 +       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 
 +       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 
 +       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 
 +       )) 
 +     with stopwatch(): 
 +       r = dx.Write(ID, 634, fffffd, echo = ec) 
 +     print(f'Write({ID})={r}') 
 +     with stopwatch(): 
 +       r = dx.Read(ID, 634, len(fffffd), echo = ec) 
 +     if r: 
 +       print(f'Read({ID})=', r.hex(':')) 
 + 
 +     """ bulk read inst. """ 
 +     with stopwatch(): 
 +       d = dx.BulkRead((dx.TBulkR(1,0,10), dx.TBulkR(2,0,20), dx.TBulkR(3,0,30), dx.TBulkR(4,0,40), dx.TBulkR(5,0,40)), echo = ec) 
 +     print('BulkRead=') 
 +     for d in d: 
 +       print(f' ({d[0]},0)', d[1].hex(':')) 
 + 
 +     dx.Write8(ID, 64, 1, echo = ec) 
 +     """ bulk write inst. """ 
 +     print('BulkWrite=') 
 +     with stopwatch(): 
 +       print(' 1', dx.BulkWrite((dx.TBulkW(1,104, L2Bs((0,0,0,1024))), dx.TBulkW(2,65,B2Bs(0)), dx.TBulkW(3,65,B2Bs(0)), dx.TBulkW(4,65,B2Bs(0))), echo = ec)) 
 +     time.sleep(0.5) 
 +     with stopwatch(): 
 +       print(' 2', dx.BulkWrite((dx.TBulkW(1,104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(2048)), dx.TBulkW(2,65,B2Bs(1)), dx.TBulkW(3,65,B2Bs(1)), dx.TBulkW(4,65,B2Bs(1))), echo = ec)) 
 +     time.sleep(0.5) 
 +     with stopwatch(): 
 +       print(' 3', dx.BulkWrite((dx.TBulkW(1,104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(1024)), dx.TBulkW(2,65,B2Bs(0)), dx.TBulkW(3,65,B2Bs(0))), echo = ec)) 
 +     time.sleep(0.5) 
 +     with stopwatch(): 
 +       print(' 4', dx.BulkWrite((dx.TBulkW(1,116, L2Bs(2048)), dx.TBulkW(2,65,B2Bs(1)), dx.TBulkW(3,65,B2Bs(1))), echo = ec)) 
 +     time.sleep(0.5) 
 +     with stopwatch(): 
 +       print(' 5', dx.BulkWrite((dx.TBulkW(1,65,bytes(0)),d​x.TBulkW(2,65,bytes(0)),dx.TBulkW(3,65,b​ytes(0))), echo = ec)) 
 + 
 +     dx.Write8(ID, 64, 0, echo = ec) 
 + 
 +   except Exception as ex: 
 +     pass 
 +     import traceback 
 +     print('--- Caught Exception ---') 
 +     traceback.print_exc() 
 +     print('------------------------') 
 +     ''' 
 +     trace = [] 
 +     tb = ex.__traceback__ 
 +     while tb is not None: 
 +       trace.append({ 
 +         "filename": tb.tb_frame.f_code.co_filename, 
 +         "name": tb.tb_frame.f_code.co_name, 
 +         "lineno": tb.tb_lineno 
 +       }) 
 +       tb = tb.tb_next 
 +     print(str({ 
 +       'type': type(ex).__name__, 
 +       'message': str(ex), 
 +       'trace': trace 
 +     })) 
 +     ''' 
 + 
 +   fin = True 
 + 
 +  def func2(dx): 
 +   global ID, fin, ec 
 + 
 +   while not fin: 
 +     try: 
 +       l = dx.Read8(ID, 65, echo = ec) 
 +       if l != None: 
 +         l ^= 1 
 +         dx.Write8(ID, 65, l, echo = ec) 
 +       time.sleep(0.05) 
 +     except Exception as ex: 
 +       import traceback 
 +       print('--- Caught Exception ---') 
 +       traceback.print_exc() 
 +       print('------------------------') 
 +       return 
 + 
 +  def func3(dx): 
 +   global ID, fin, ec 
 + 
 +   try: 
 +     """ dump memory """ 
 +     l = 20 
 +     for addr in range(0, 250, l): 
 +       r = dx.Read(ID, addr, l, echo = ec) 
 +       print(f'Read({addr};{l})=',r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!') 
 + 
 +     ''' 
 +     """ reset dxl """ 
 +     with stopwatch(): 
 +       r = dx.FactoryReset(ID, echo = ec) 
 +     print(f'FactoryReset({ID}): {r}') 
 +     time.sleep(0.5) 
 +     ''' 
 + 
 +     """ ping dxl """ 
 +     print(f'Ping({ID})=', dx.Ping(ID, echo = ec)) 
 + 
 +     """ reboot dxl """ 
 +     with stopwatch(): 
 +       r = dx.Reboot(ID, echo = ec) 
 +     print(f'Reboot({ID})={r}') 
 +     time.sleep(1) 
 + 
 +     """ basic packet proc """ 
 +     with stopwatch(): 
 +       with dx.mutex: 
 +         r0 = dx.TxPacket(ID, dx.INST_WRITE, (25, 1)) 
 +         r1 = dx.RxPacket() 
 +     print(f'TxPacket({ID})={r0[1]}', r0[0].hex(':')) 
 +     if r1[0]: 
 +       print(f'RxPacket({ID})={r1[1]}', r1[0].hex(':')) 
 + 
 +     """ read byte item """ 
 +     with stopwatch(): 
 +       r = dx.Read8(ID, 25, echo = ec) 
 +     print(f'Read8({ID})={r}') 
 + 
 +     """ sync write inst. """ 
 +     for i in range(30): 
 +       with stopwatch(): 
 +         dx.SyncWrite(25, 1, (dx.TSyncW(1, B2Bs(1)), dx.TSyncW(2, B2Bs(1)), dx.TSyncW(3, B2Bs(1))), echo = ec) 
 +       time.sleep(0.05) 
 +       with stopwatch(): 
 +         dx.SyncWrite(25, 1, (dx.TSyncW(1, B2Bs(0)), dx.TSyncW(2, B2Bs(0)), dx.TSyncW(3, B2Bs(0))), echo = ec) 
 +       time.sleep(0.05) 
 + 
 +     """ set goal position """ 
 +     #torque off 
 +     if dx.Write8(ID, 24, 0, echo = ec): 
 +       #multi turn 
 +       if dx.Write16(ID, 6, (4095, 4095), echo = ec): 
 +         #torque on 
 +         if dx.Write8(ID, 24, 1, echo = ec): 
 +           print(f'Write16/Read16({ID})') 
 +           for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)): 
 +             for i in range(10): 
 +               #set goal position 
 +               with stopwatch(): 
 +                 dx.Write16(ID, 30, gp, echo = ec) 
 +               #get present position 
 +               with stopwatch(): 
 +                 pp = dx.Read16(ID, 36, signed = True, echo = ec) 
 +               if pp != None: 
 +                 print(f' w={gp:6} r={pp:6} diff={gp-pp:5}', end='\r') 
 +               else: 
 +                 pass 
 +                 print('None                            ', end='\r') 
 +                 break 
 +               time.sleep(0.005) 
 +             else: 
 +               continue 
 +             pass 
 +             break 
 +           print('') 
 +           #torque off 
 +           dx.Write8(ID, 24, 0, echo = ec) 
 +           #normal turn 
 +           with stopwatch(): 
 +             dx.Write16(ID, 6, (0, 4095), echo = ec) 
 +   except Exception as ex: 
 +     pass 
 +     import traceback 
 +     print('--- Caught Exception ---') 
 +     traceback.print_exc() 
 +     print('------------------------') 
 + 
 +  import sys 
 +  import platform 
 +  print(sys.version) 
 + 
 +  try: 
 +   dx = DXLProtocolV2('\\\\.\\COM12', 3000000) 
 +  except: 
 +   pass 
 +  else: 
 +   th1 = Thread(target = func1, args = (dx,)) 
 +   th2 = Thread(target = func2, args = (dx,)) 
 +   th1.start() 
 +   th2.start() 
 +   th1.join() 
 +   th2.join() 
 +   del th1, th2, dx 
 + 
 +  try: 
 +   dx = DXLProtocolV1('\\\\.\\COM12', 57600) 
 +  except: 
 +   dx = None 
 +  else: 
 +   th3 = Thread(target = func3, args = (dx,)) 
 +   th3.start() 
 +   th3.join() 
 +   del th3, dx 
 + 
 +  print('fin') 
 +</pre> 
 +}} 
 + 
 +****Pythonその3 [#v9d3114a] 
 +[[Python2>#k91ab321]]ついでに[[MicroPython>https://micropython.org/]]​の互換性レベルを図ってみた。~ 
 +欲しいライブラリが用意されていない以上に、基本的な部分が結構緩い。また実際に動かすとそれなりに遅い。それでもMicroPythonは存在しているし、個人的にはかつてのBASICの感覚が蘇ってきたので、ちょっとした使い方であれば選択肢に入れても良いかなと。~ 
 +ひとまず[[PicoSHIELD]]+[[Raspberry Pi Pico>https://micropython.org/download/RP​I_PICO/]]上で動くように修正してみた。オーバーロックしているのはご愛敬として、マルチスレッドを除外した程度で実質PC版と大差無い。@micropython.viperを多用した時はかなり高速化できていたのだが、ここはコードの互換性を優先。 
 +#html{{ 
 +<pre class="brush: python;"  title="pyDXL.py"> 
 +# pyDXL.py for MicroPython(RP2040) 
 +# SPDX-License-Identifier: MIT 
 +# SPDX-FileCopyrightText: (C) 2024 mukyokyo 
 + 
 + 
 +from machine import UART, Pin 
 +import time, array 
 +from collections import namedtuple 
 +from struct import pack, unpack 
 + 
 +########################################​################## 
 +# Functionalized the part of converting int to bytes. 
 +# If specified as a tuple, it is converted to bytes at once. 
 +########################################​################## 
 +@micropython.native 
 +def B2Bs(d) -> bytes: 
 +  if isinstance(d, list) or isinstance(d, tuple): 
 +   return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d) 
 +  else: 
 +   return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d)) 
 + 
 +@micropython.native 
 +def W2Bs(d) -> bytes: 
 +  if isinstance(d, list) or isinstance(d, tuple): 
 +   return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]]) 
 +  else: 
 +   return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d)) 
 + 
 +@micropython.native 
 +def L2Bs(d) -> bytes: 
 +  if isinstance(d, list) or isinstance(d, tuple): 
 +   return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]]) 
 +  else: 
 +   return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d)) 
 + 
 +########################################​################## 
 +# API for Dynamixel protocol V1 
 +########################################​################## 
 +class DXLProtocolV1: 
 +  BROADCASTING_ID = 0xfe 
 +  INST_PING          = 0x01 
 +  INST_READ          = 0x02 
 +  INST_WRITE          = 0x03 
 +  INST_REG_WRITE      = 0x04 
 +  INST_ACTION        = 0x05 
 +  INST_FACTORY_RESET  = 0x06 
 +  INST_REBOOT        = 0x08 
 +  INST_SYNC_WRITE    = 0x83 
 +  INST_SYNG_REG_WRITE = 0x85 
 + 
 +  TSyncW = namedtuple("TSyncW", ("id", ("data"))) 
 + 
 +  def __init__(self, baudrate = 57600, timeout_ms = 50): 
 +   """ 
 +   Initalize 
 + 
 +   parameters 
 +   ------------- 
 +   port : int 
 +     UART no. 
 +   baudrate : int 
 +     Serial baudrate[bps] 
 +   timeout : float 
 +     Read timeout[s] 
 +   """ 
 +   self.__Error = 0 
 +   self.__baudrate = baudrate 
 +   self.__timeout = timeout_ms 
 +   self.__serial = UART(1, baudrate, tx = Pin(4), rx = Pin(5), txbuf = 500, rxbuf = 500, timeout = timeout_ms) 
 + 
 +  @property 
 +  def baudrate(self): 
 +   return self.__baudrate 
 + 
 +  @baudrate.setter 
 +  def baudrate(self, baudrate): 
 +   self.__baudrate = baudrate 
 +   self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) 
 + 
 +  @property 
 +  def timeout(self): 
 +   return self.__timeout 
 + 
 +  @timeout.setter 
 +  def timeout(self, timeout_ms): 
 +   self.__timeout = timeout_ms 
 +   self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) 
 + 
 +  @property 
 +  def Error(self): 
 +   return self.__Error 
 + 
 +  @micropython.native 
 +  def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool): 
 +   """ 
 +   Sending packets 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   inst : int 
 +     Instruction command 
 +   param : bytes 
 +     Packet parameters 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Packets sent 
 +   bool 
 +     Success or failure 
 +   """ 
 +   if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 253)) and len(param) <= (256 - 6): 
 +     instp = bytearray([0xff,0xff,id,0,inst]) + bytes(param) 
 +     instp[3] = len(instp) - 3 
 +     instp += B2Bs(~sum(instp[2:]) & 0xff) 
 +     _ = self.__serial.read(self.__serial.any()) ​
 +     if echo: print('TX:', instp.hex(':')) 
 +     self.__serial.write(instp) 
 +     return bytes(instp), True 
 +   return None, False 
 + 
 +  @micropython.native 
 +  def __rx(self, length) -> bytes: 
 +   s = self.__serial.read(length) 
 +   if s: 
 +     l = len(s) 
 +     if l == length: 
 +       return s 
 +     else: 
 +       r = s 
 +       length -= l 
 +       while self.__serial.any() > 0: 
 +         s = self.__serial.read(length) 
 +         r += s 
 +         length -= len(s) 
 +         if length == 0: 
 +           break 
 +       return r 
 +   else: 
 +     return bytes() 
 + 
 +  @micropython.native 
 +  def RxPacket(self, echo = False) -> (bytes, bool): 
 +   """ 
 +   Receiving packets 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Packets received 
 +   bool 
 +     Success or failure 
 +   """ 
 +   self.__serial.flush() 
 +   statp = self.__rx(5) 
 +   if statp: 
 +     if len(statp) == 5: 
 +       l = statp[3] - 1 
 +       self.__Error = statp[4] 
 +       statp += self.__rx(l) 
 +       if len(statp) == l + 5: 
 +         if statp[-1:][0] == ((~sum(statp[2:-1])) & 0xff): 
 +           if echo: print('RX:', statp.hex(':')) 
 +           return bytes(statp), (statp[4] & 0x40) == 0 
 +   return None, False 
 + 
 +  @micropython.native 
 +  def Write(self, id : int, addr : int, data : bytes, echo = False) -> bool: 
 +   """ 
 +   Write instruction 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   addr : int 
 +     Target item address 
 +   data : bytes 
 +     Data to be written 
 + 
 +   Returns 
 +   ------- 
 +   result : bool 
 +     Success or failure 
 +   """ 
 +   if id >= 0 and id <= self.BROADCASTING_ID and addr >= 0  and addr <= 254: 
 +     if self.TxPacket(id, self.INST_WRITE, B2Bs(addr) + data, echo)[1]: 
 +       if id != self.BROADCASTING_ID: 
 +         dat, r = self.RxPacket(echo) 
 +         if r: 
 +           return dat[2] == id and (dat[4] & 0x18) == 0 
 +       else: 
 +         return True 
 +   return False 
 + 
 +  @micropython.native 
 +  def Write8(self, id : int, addr : int, data : int | tuple | list, echo = False) -> bool: 
 +   return self.Write(id, addr, B2Bs(data), echo) 
 + 
 +  @micropython.native 
 +  def Write16(self, id : int, addr : int, data : int | tuple | list, echo = False) -> bool: 
 +   return self.Write(id, addr, W2Bs(data), echo) 
 + 
 +  @micropython.native 
 +  def Write32(self, id : int, addr : int, data : int | tuple | list, echo = False) -> bool: 
 +   return self.Write(id, addr, L2Bs(data), echo) 
 + 
 +  @micropython.native 
 +  def Read(self, id : int, addr : int, length : int, echo = False) -> (bytes, bool): 
 +   """ 
 +   Read instruction 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   addr : int 
 +     Target item address 
 +   length : int 
 +     Number of bytes to read 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Data read 
 +   bool 
 +     Success or failure 
 +   """ 
 +   if id >= 0 and id <= 253 and addr >= 0 and addr <= 254 and length > 0 and length <= (256 - 6): 
 +     if self.TxPacket(id, self.INST_READ, B2Bs((addr, length)), echo)[1]: 
 +       dat, r = self.RxPacket(echo) 
 +       if r: 
 +         if dat[2] == id and (dat[4] & 0x8) == 0: 
 +           return bytes(dat[5:-1]) 
 +   return None 
 + 
 +  @micropython.native 
 +  def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, length, echo) 
 +   if r != None: 
 +     n = unpack('<' + (('b'*length) if signed else ('B'*length)), r[0:1 * length]) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  @micropython.native 
 +  def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, 2 * length, echo) 
 +   if r != None: 
 +     n = unpack('<' + (('h'*length) if signed else ('H'*length)), r[0:2 * length]) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  @micropython.native 
 +  def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, 4 * length, echo) 
 +   if r != None: 
 +     n = unpack('<' + (('i'*length) if signed else ('I'*length)), r[0:4 * length]) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  @micropython.native 
 +  def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool: 
 +   """ 
 +   Sync Write instruction 
 + 
 +   parameters 
 +   ------------- 
 +   addr : int 
 +     Target item address 
 +   length : int 
 +     Number of bytes to write 
 +   id_datas : (TSyncW) 
 +     Target ID and data 
 + 
 +   Returns 
 +   ------- 
 +   bool 
 +     Success or failure 
 +   """ 
 +   if addr >= 0 and addr <= 254 and length > 0 and length < (256 - 6): 
 +     param = B2Bs((addr,length)) 
 +     for d in id_datas: 
 +       param += B2Bs(d.id) + d.data 
 +       if len(d.data) != length or d.id < 0 or d.id > 253: 
 +         del param 
 +         return False 
 +     del param 
 +     return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1] 
 +   return False 
 + 
 +  @micropython.native 
 +  def Ping(self, id : int, echo = False) -> bool: 
 +   if self.TxPacket(id, self.INST_PING, bytes(), echo): 
 +     dat, r = self.RxPacket(echo) 
 +     if r: 
 +       return id == dat[2] and dat[3] == 2 
 +   return False 
 + 
 +  @micropython.native 
 +  def FactoryReset(self, id : int, echo = False) -> bool: 
 +   if self.TxPacket(id, self.INST_RESET, bytes(), echo): 
 +     dat, r = self.RxPacket(echo) 
 +     if r: 
 +       return id == dat[2] and dat[3] == 2 
 +   return False 
 + 
 +  @micropython.native 
 +  def Reboot(self, id : int, echo = False) -> bool: 
 +   if self.TxPacket(id, self.INST_REBOOT, bytes(), echo): 
 +     dat, r = self.RxPacket(echo) 
 +     if r: 
 +       return id == dat[2] and dat[3] == 2 
 +   return False 
 + 
 +########################################​################## 
 +# API for Dynamixel protocol V2 
 +########################################​################## 
 +class DXLProtocolV2: 
 +  BROADCASTING_ID = const(0xfe) 
 +  INST_PING                = const(0x01) 
 +  INST_READ                = const(0x02) 
 +  INST_WRITE                = const(0x03) 
 +  INST_REG_WRITE            = const(0x04) 
 +  INST_ACTION              = const(0x05) 
 +  INST_FACTORY_RESET        = const(0x06) 
 +  INST_REBOOT              = const(0x08) 
 +  INST_SYS_WRITE            = const(0x0d) 
 +  INST_CLEAR                = const(0x10) 
 +  INST_CONTROL_TABLE_BACKUP = const(0x20) 
 +  INST_STATUS              = const(0x55) 
 +  INST_SYNC_READ            = const(0x82) 
 +  INST_SYNC_WRITE          = const(0x83) 
 +  INST_SYNG_REG_WRITE      = const(0x85) 
 +  INST_FAST_SYNC_READ      = const(0x8a) 
 +  INST_BULK_READ            = const(0x92) 
 +  INST_BULK_WRITE          = const(0x93) 
 +  INST_FAST_BULK_READ      = const(0x9a) 
 + 
 +  TSyncW = namedtuple("TSyncW", ("id", ("data"))) 
 +  TBulkW = namedtuple("TBulkW", ("id", "addr", ("data"))) 
 +  TBulkR = namedtuple("TBulkR", ("id", "addr", "length")) 
 + 
 +  __crc16_lutable = array.array('H', ( 
 +   0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011, 
 +   0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D, 0x8027, 0x0022, 
 +   0x8063, 0x0066, 0x006C, 0x8069, 0x0078, 0x807D, 0x8077, 0x0072, 
 +   0x0050, 0x8055, 0x805F, 0x005A, 0x804B, 0x004E, 0x0044, 0x8041, 
 +   0x80C3, 0x00C6, 0x00CC, 0x80C9, 0x00D8, 0x80DD, 0x80D7, 0x00D2, 
 +   0x00F0, 0x80F5, 0x80FF, 0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1, 
 +   0x00A0, 0x80A5, 0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1, 
 +   0x8093, 0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087, 0x0082, 
 +   0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D, 0x8197, 0x0192, 
 +   0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB, 0x01AE, 0x01A4, 0x81A1, 
 +   0x01E0, 0x81E5, 0x81EF, 0x01EA, 0x81FB, 0x01FE, 0x01F4, 0x81F1, 
 +   0x81D3, 0x01D6, 0x01DC, 0x81D9, 0x01C8, 0x81CD, 0x81C7, 0x01C2, 
 +   0x0140, 0x8145, 0x814F, 0x014A, 0x815B, 0x015E, 0x0154, 0x8151, 
 +   0x8173, 0x0176, 0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162, 
 +   0x8123, 0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137, 0x0132, 
 +   0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E, 0x0104, 0x8101, 
 +   0x8303, 0x0306, 0x030C, 0x8309, 0x0318, 0x831D, 0x8317, 0x0312, 
 +   0x0330, 0x8335, 0x833F, 0x033A, 0x832B, 0x032E, 0x0324, 0x8321, 
 +   0x0360, 0x8365, 0x836F, 0x036A, 0x837B, 0x037E, 0x0374, 0x8371, 
 +   0x8353, 0x0356, 0x035C, 0x8359, 0x0348, 0x834D, 0x8347, 0x0342, 
 +   0x03C0, 0x83C5, 0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1, 
 +   0x83F3, 0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7, 0x03E2, 
 +   0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD, 0x83B7, 0x03B2, 
 +   0x0390, 0x8395, 0x839F, 0x039A, 0x838B, 0x038E, 0x0384, 0x8381, 
 +   0x0280, 0x8285, 0x828F, 0x028A, 0x829B, 0x029E, 0x0294, 0x8291, 
 +   0x82B3, 0x02B6, 0x02BC, 0x82B9, 0x02A8, 0x82AD, 0x82A7, 0x02A2, 
 +   0x82E3, 0x02E6, 0x02EC, 0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2, 
 +   0x02D0, 0x82D5, 0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1, 
 +   0x8243, 0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257, 0x0252, 
 +   0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E, 0x0264, 0x8261, 
 +   0x0220, 0x8225, 0x822F, 0x022A, 0x823B, 0x023E, 0x0234, 0x8231, 
 +   0x8213, 0x0216, 0x021C, 0x8219, 0x0208, 0x820D, 0x8207, 0x0202 
 +  )) 
 + 
 +  def __init__(self, baudrate = 57600, timeout_ms = 50): 
 +   """ 
 +   Initalize 
 + 
 +   parameters 
 +   ------------- 
 +   port : int 
 +     UART no. 
 +   baudrate : int 
 +     Serial baudrate[bps] 
 +   timeout : float 
 +     Read timeout[s] 
 +   """ 
 +   self.__Error = 0 
 +   self.__baudrate = baudrate 
 +   self.__timeout = timeout_ms 
 +   self.__serial = UART(1, baudrate, tx = Pin(4), rx = Pin(5), txbuf = 500, rxbuf = 500, timeout = timeout_ms) 
 + 
 +  @property 
 +  def baudrate(self): 
 +   return self.__baudrate 
 + 
 +  @baudrate.setter 
 +  def baudrate(self, baudrate): 
 +   self.__baudrate = baudrate 
 +   self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) 
 + 
 +  @property 
 +  def timeout(self): 
 +   return self.__timeout 
 + 
 +  @timeout.setter 
 +  def timeout(self, timeout_ms): 
 +   self.__timeout = timeout_ms 
 +   self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) 
 + 
 +  @property 
 +  def Error(self): 
 +   return self.__Error 
 + 
 +  @micropython.viper 
 +  def __crc16(self, dat : ptr8, l:int) -> int: 
 +   crc:int = 0 
 +   pt = ptr16(self.__crc16_lutable) 
 +   for i in range(l): 
 +     crc = (crc << 8) ^ pt[(((crc >> 8) ^ dat[i]) & 0xff)] 
 +   return crc & 0xffff 
 + 
 +  @micropython.native 
 +  def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool): 
 +   """ 
 +   Sending packets 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   inst : int 
 +     Instruction command 
 +   param : bytes 
 +     Packet parameters 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Packets sent 
 +   bool 
 +     Success or failure 
 +   """ 
 +   if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 252)) and len(param) < (65536 - 10): 
 +     instp = bytearray([0xff,0xff,0xfd,0x00,id,0,0,in​st]) + bytearray(param).replace(b'\xff\xff\xfd'​,b'\xff\xff\xfd\xfd') 
 +     instp[5:7] = W2Bs(len(instp) - 5) 
 +     instp += W2Bs(self.__crc16(instp, len(instp))) 
 +     _ = self.__serial.read(self.__serial.any()) ​
 +     if echo: print('TX:', instp.hex(':')) 
 +     self.__serial.write(instp) 
 +     return bytes(instp), True 
 +   return None, False 
 + 
 +  @micropython.native 
 +  def __rx(self, length) -> bytes: 
 +   s = self.__serial.read(length) 
 +   if s: 
 +     l = len(s) 
 +     if l == length: 
 +       return s 
 +     else: 
 +       r = s 
 +       length -= l 
 +       while self.__serial.any() > 0: 
 +         s = self.__serial.read(length) 
 +         r += s 
 +         length -= len(s) 
 +         if length == 0: 
 +           break 
 +       return r 
 +   else: 
 +     return bytes() 
 + 
 +  @micropython.native 
 +  def RxPacket(self, echo = False) -> (bytes, bool): 
 +   """ 
 +   Receiving packets 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Packets received 
 +   bool 
 +     Success or failure 
 +   """ 
 +   self.__serial.flush() 
 +   statp = self.__rx(9) 
 +   if statp: 
 +     if len(statp) == 9: 
 +       if statp[0] == 0xff and statp[1] == 0xff and statp[2] == 0xfd and statp[3] == 0 and statp[7] == 0x55: 
 +         l = unpack('<H', statp[5:7])[0] - 2 
 +         self.__Error = statp[8] 
 +         statp += self.__rx(l) 
 +         if len(statp) == l + 9: 
 +           if unpack('<H', statp[-2:])[0] == self.__crc16(statp[:-2], len(statp[:-2])): 
 +             statp = statp[0:9] + statp[9:].replace(b'\xff\xff\xfd\xfd',b'​\xff\xff\xfd') 
 +             if echo: print('RX:', statp.hex(':')) 
 +             return bytes(statp), (statp[8] & 0x7f) == 0 
 +   return None, False 
 + 
 +  @micropython.native 
 +  def Write(self, id : int, addr : int, data: bytes, echo = False) -> bool: 
 +   """ 
 +   Write instruction 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   addr : int 
 +     Target item address 
 +   data : bytes 
 +     Data to be written 
 + 
 +   Returns 
 +   ------- 
 +   result : bool 
 +     Success or failure 
 +   """ 
 +   if ((id >= 0 and id <= 252) or (id == self.BROADCASTING_ID)) and addr >= 0 and addr <= 65535: 
 +     if self.TxPacket(id, self.INST_WRITE, W2Bs(addr) + data, echo)[1]: 
 +       if id != self.BROADCASTING_ID: 
 +         dat, r = self.RxPacket(echo) 
 +         if r: 
 +           return dat[4] == id and (dat[8] & 0x7f) == 0 
 +         else: 
 +           return True 
 +   return False 
 + 
 +  @micropython.native 
 +  def Write8(self, id : int, addr : int, data : int | tuple | list, echo = False): 
 +   return self.Write(id, addr, B2Bs(data), echo) 
 + 
 +  @micropython.native 
 +  def Write16(self, id : int, addr : int, data : int | tuple | list, echo = False): 
 +   return self.Write(id, addr, W2Bs(data), echo) 
 + 
 +  @micropython.native 
 +  def Write32(self, id : int, addr : int, data : int | tuple | list, echo = False): 
 +   return self.Write(id, addr, L2Bs(data), echo) 
 + 
 +  @micropython.native 
 +  def Read(self, id : int, addr : int, length : int, echo = False) -> (bytes, bool): 
 +   """ 
 +   Read instruction 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   addr : int 
 +     Target item address 
 +   length : int 
 +     Number of bytes to read 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Data read 
 +   bool 
 +     Success or failure 
 +   """ 
 +   if id >= 0 and id <= 252 and addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10): 
 +     if self.TxPacket(id, self.INST_READ, W2Bs(addr) + W2Bs(length), echo)[1]: 
 +       dat, r = self.RxPacket(echo) 
 +       if r: 
 +         return bytes(dat[9:-2]) 
 +   return None 
 + 
 +  @micropython.native 
 +  def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, length, echo) 
 +   if r: 
 +     n = unpack('<' + (('b'*length) if signed else ('B'*length)), r[0:1 * length]) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  @micropython.native 
 +  def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, 2 * length, echo) 
 +   if r != None: 
 +     n = unpack('<' + (('h'*length) if signed else ('H'*length)), r[0:2 * length]) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  @micropython.native 
 +  def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, 4 * length, echo) 
 +   if r != None: 
 +     n = unpack('<' + (('i'*length) if signed else ('I'*length)), r[0:4 * length]) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  @micropython.native 
 +  def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool: 
 +   """ 
 +   Sync Write instruction 
 + 
 +   parameters 
 +   ------------- 
 +   addr : int 
 +     Target item address 
 +   length : int 
 +     Number of bytes to write 
 +   id_datas : (TSyncW) 
 +     Target ID and data 
 + 
 +   Returns 
 +   ------- 
 +   bool 
 +     Success or failure 
 +   """ 
 +   if addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10): 
 +     param = W2Bs(addr) + W2Bs(length) 
 +     for d in id_datas: 
 +       param += bytes((d.id,)) + d.data 
 +       if len(d.data) != length or d.id < 0 or d.id > 252: 
 +         del param 
 +         return False 
 +     return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1] 
 +   return False 
 + 
 +  @micropython.native 
 +  def SyncRead(self, addr : int, length : int, ids : (int), echo = False) -> tuple: 
 +   """ 
 +   Sync Read instruction 
 + 
 +   parameters 
 +   ------------- 
 +   addr : int 
 +     Target item address 
 +   length : int 
 +     Number of bytes to read 
 +   ids : (int) 
 +     Target IDs 
 + 
 +   Returns 
 +   ------- 
 +   tuple 
 +     Read ID and data 
 +   """ 
 +   result = () 
 +   if addr >= 0 and addr < 65535 and length > 0 and length < (65536 - 10): 
 +     if self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_READ, W2Bs(addr) + W2Bs(length) + B2Bs(ids), echo)[1]: 
 +       for id in ids: 
 +         dat, r = self.RxPacket(echo) 
 +         if r: 
 +           if dat[4] == id: 
 +             result += (id, bytes(dat[9:9 + length])), 
 +           else: 
 +             result += (id, bytes([])), 
 +         else: 
 +           result += (id, bytes([])), 
 +   return result 
 + 
 +  @micropython.native 
 +  def BulkWrite(self, data : (TBulkW), echo = False) -> bool: 
 +   """ 
 +   Bulk Write instruction 
 + 
 +   parameters 
 +   ------------- 
 +   addr : int 
 +     Target item address 
 +   length : int 
 +     Number of bytes to write 
 +   ids : (TSyncW) 
 +     Target ID and data 
 + 
 +   Returns 
 +   ------- 
 +   bool 
 +     Success or failure 
 +   """ 
 +   param = bytes() 
 +   for d in data: 
 +     if d.id >= 0 and d.id <= 252 and d.addr >= 0 and d.addr <= 65535: 
 +       param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(len(d.data)) + d.data 
 +     else: 
 +       del param 
 +       return False 
 +   return self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_WRITE, param, echo)[1] 
 + 
 +  @micropython.native 
 +  def BulkRead(self, data:(TBulkR), echo = False) -> tuple: 
 +   """ 
 +   Bulk Read instruction 
 + 
 +   parameters 
 +   ------------- 
 +   data : (TBulkR) 
 +     ID, address, and number of bytes to be read 
 + 
 +   Returns 
 +   ------- 
 +   tuple 
 +     Read ID and data 
 +   """ 
 +   result = () 
 +   param = bytes() 
 +   for d in data: 
 +     if d.id < 0 or d.addr < 0 or d.length < 0: 
 +       return result 
 +     param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(d.length) 
 +   if self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_READ, param, echo)[1]: 
 +     for d in data: 
 +       rxd, r = self.RxPacket(echo) 
 +       if r: 
 +         if(d.id == rxd[4]): 
 +           result += (d.id, bytes(rxd[9:-2])), 
 +         else: 
 +           result += (d.id, bytes([])), 
 +       else: 
 +         result += (d.id, bytes([])), 
 +   del param 
 +   return result 
 + 
 +  @micropython.native 
 +  def Ping(self, id : int, echo = False) -> bool: 
 +   if self.TxPacket(id, self.INST_PING, bytes(), echo)[1]: 
 +     rxd, r = self.RxPacket(echo) 
 +     if r: 
 +       return id == rxd[4] and rxd[5] == 7 and rxd[6] == 0 
 +   return False 
 + 
 +  @micropython.native 
 +  def FactoryReset(self, id : int, p1 : int, echo = False) -> bool: 
 +   if self.TxPacket(id, self.INST_FACTORY_RESET, B2Bs(p1), echo)[1]: 
 +     rxd, r = self.RxPacket(echo) 
 +     if r: 
 +       return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0 
 +   return False 
 + 
 +  @micropython.native 
 +  def Reboot(self, id : int, echo = False) -> bool: 
 +   if self.TxPacket(id, self.INST_REBOOT, bytes(), echo)[1]: 
 +     rxd, r = self.RxPacket(echo) 
 +     if r: 
 +       return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0 
 +   return False 
 + 
 +  @micropython.native 
 +  def Clear(self, id : int, val, echo = False) -> bool: 
 +   if self.TxPacket(id, self.INST_CLEAR, B2Bs(1) + L2Bs(val), echo)[1]: 
 +     rxd, r = self.RxPacket(echo) 
 +     if r: 
 +       return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0 
 +   return False 
 + 
 +########################################​################## 
 +# test code 
 +########################################​################## 
 +if __name__ == "__main__": 
 +  import uasyncio as asyncio 
 +  from ucontextlib import contextmanager 
 +  import gc 
 + 
 +  try: 
 +   led = Pin('LED', Pin.OUT) 
 +  except TypeError: 
 +   led = Pin(25, Pin.OUT) 
 +  led.off() 
 + 
 +  dx = None 
 +  ec = False 
 +  ID = 1 
 +  fin = False 
 + 
 +  async def test1(): 
 +   global dx, ec, ID, fin 
 +   @contextmanager 
 +   def stopwatch(): 
 +     led.toggle() 
 +     start_time = time.ticks_us() 
 +     yield 
 +     #print('..proc time={:.2f}ms {}'.format((time.ticks_us() - start_time) / 1000, gc.mem_alloc())) 
 + 
 +   # Basic operation check 
 +   try: 
 +     ''' 
 +     with stopwatch(): 
 +       r = dx.FactoryReset(ID, echo=ec) 
 +     print(f'Reset({ID}): {r}') 
 +     dx.baudrate = 57600 
 +     ID = 1 
 +     await asyncio.sleep(0.5) 
 +     ''' 
 + 
 +     """ ping dxl """ 
 +     for i in range(10): 
 +       with stopwatch(): 
 +         print(f'Ping({i}):', dx.Ping(i, echo=ec)) 
 + 
 +     """ reboot dxl """ 
 +     with stopwatch(): 
 +       r = dx.Reboot(ID, echo=ec) 
 +     print(f'Reboot({ID})= {r}') 
 +     await asyncio.sleep(0.5) 
 + 
 +     """ basic packet proc """ 
 +     with stopwatch(): 
 +       r0 = dx.TxPacket(ID, dx.INST_WRITE, (65, 0, 1)) 
 +       r1 = dx.RxPacket() 
 +     print('TxPacket({})= {} {}'.format(ID, r0[1], r0[0].hex(':') if r0[0] else '--')) 
 +     print('RxPacket({})= {} {}'.format(ID, r1[1], r1[0].hex(':') if r1[0] else '--')) 
 + 
 +     """ dump memory """ 
 +     l = 50 
 +     for addr in range(0, 700, l): 
 +       r = dx.Read(ID, addr, l, echo = ec) 
 +       print(f'Read({addr};{l})=',r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!') 
 + 
 +     """ read byte item """ 
 +     with stopwatch(): 
 +       r = dx.Read8(ID, 65, echo=ec) 
 +     print(f'Read8({ID},65)= {r}') 
 +     """ read word item """ 
 +     with stopwatch(): 
 +       r = dx.Read16(ID, 120, echo=ec) 
 +     print(f'Read16({ID},120)= {r}') 
 +     """ read long item """ 
 +     with stopwatch(): 
 +       r = dx.Read32(ID, 132, echo=ec) 
 +     print(f'Read32({ID},132)= {r}') 
 + 
 +     """ sync read inst. """ 
 +     with stopwatch(): 
 +       d = dx.SyncRead(0, 4, (1, 2, 3, 4, 5, 6, 7, 8), echo=ec) 
 +     print('SyncRead=') 
 +     for d in d: 
 +       print(f' ({d[0]},0)', d[1].hex(':') if len(d[1]) > 0 else ()) 
 + 
 +     """ sync write inst. """ 
 +     for i in range(20): 
 +       with stopwatch(): 
 +         dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(1)), dx.TSyncW(2, B2Bs(1)), dx.TSyncW(3, B2Bs(1))), echo=ec) 
 +       await asyncio.sleep(0.05) 
 +       with stopwatch(): 
 +         dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(0)), dx.TSyncW(2, B2Bs(0)), dx.TSyncW(3, B2Bs(0))), echo=ec) 
 +       await asyncio.sleep(0.05) 
 + 
 +     """ set goal position """ 
 +     #torque off 
 +     with stopwatch(): 
 +       dx.Write8(ID, 64, 0, echo=ec) 
 +     #multi turn 
 +     with stopwatch(): 
 +       dx.Write8(ID, 11, 4, echo=ec) 
 +     #torque on 
 +     with stopwatch(): 
 +       dx.Write8(ID, 64, 1, echo=ec) 
 +     for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)): 
 +       #set goal potition 
 +       with stopwatch(): 
 +         dx.Write32(ID, 116, gp, echo=ec) 
 +       #get present potition 
 +       with stopwatch(): 
 +         pp = dx.Read32(ID, 132, signed = True, echo=ec) 
 +       if pp: 
 +         print(f'Goal Pos={gp:6d}, Present Pos={pp:6d}', end='\r') 
 +       else: 
 +         break 
 +       await asyncio.sleep(0.05) 
 +     else: 
 +       print('') 
 +     #torque off 
 +     with stopwatch(): 
 +       dx.Write8(ID, 64, 0, echo=ec) 
 +     #normal turn 
 +     with stopwatch(): 
 +       dx.Write8(ID, 11, 3, echo=ec) 
 + 
 +     """ test add/remove suffixes """ 
 +     fffffd = bytes(( 
 +       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 
 +       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 
 +       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, 
 +     )) 
 +     with stopwatch(): 
 +       r = dx.Write(ID, 634, fffffd, echo=ec) 
 +     print(f'Write({ID},634)= {r}') 
 +     with stopwatch(): 
 +       r = dx.Read(ID, 634, len(fffffd), echo=ec) 
 +     print(f'Read({ID},634)=', r.hex(':') if r else '--') 
 + 
 +     """ bulk read inst. """ 
 +     with stopwatch(): 
 +       d = dx.BulkRead((dx.TBulkR(1, 0, 10), dx.TBulkR(2, 0, 20), dx.TBulkR(3, 0, 30), dx.TBulkR(4, 0, 40), dx.TBulkR(6, 0, 50)), echo=ec) 
 +     print('BulkRead=') 
 +     for d in d: 
 +       print(f' ({d[0]},0)', d[1].hex(':')) 
 + 
 +     """ bulk write inst. """ 
 +     dx.Write8(ID, 64, 1, echo=ec) 
 +     with stopwatch(): 
 +       dx.BulkWrite((dx.TBulkW(1, 104, L2Bs((0,0,0,1024))), dx.TBulkW(2, 65, B2Bs(0)), dx.TBulkW(3, 65, B2Bs(0))), echo=ec) 
 +     await asyncio.sleep(0.5) 
 +     with stopwatch(): 
 +       dx.BulkWrite((dx.TBulkW(1, 104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(2048)), dx.TBulkW(2, 65, B2Bs(1)), dx.TBulkW(3, 65, B2Bs(1))), echo=ec) 
 +     await asyncio.sleep(0.5) 
 +     with stopwatch(): 
 +       dx.BulkWrite((dx.TBulkW(1, 104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(1024)), dx.TBulkW(2, 65, B2Bs(0)), dx.TBulkW(3, 65, B2Bs(0))), echo=ec) 
 +     await asyncio.sleep(0.5) 
 +     with stopwatch(): 
 +       dx.BulkWrite((dx.TBulkW(1, 116, L2Bs(2048)), dx.TBulkW(2,65, B2Bs(1)), dx.TBulkW(3, 65, B2Bs(1))), echo=ec) 
 +     await asyncio.sleep(0.5) 
 +     with stopwatch(): 
 +       dx.BulkWrite((dx.TBulkW(1, 65, B2Bs(0)), dx.TBulkW(2, 65, B2Bs(0)), dx.TBulkW(3, 65, B2Bs(0))), echo=ec) 
 +     dx.Write8(ID, 64, 0, echo=ec) 
 + 
 +   except Exception as ex: 
 +     print('--- Caught Exception ---') 
 +     import sys 
 +     sys.print_exception(ex) 
 +     print('------------------------') 
 + 
 +   fin = True 
 + 
 +  async def test2(): 
 +   global dx, ec, ID, fin 
 +   await asyncio.sleep(0.5) 
 +   while not fin: 
 +     try: 
 +       dx.Write8(ID, 65, dx.Read8(ID, 65, echo=ec) ^ 1, echo=ec) 
 +       await asyncio.sleep(0.02) 
 +     except Exception as ex: 
 +       print('--- Caught Exception ---') 
 +       import sys 
 +       sys.print_exception(ex) 
 +       print('------------------------') 
 + 
 +  def test3(): 
 +   global dx, ec, ID, fin 
 + 
 +   # Basic operation check 
 +   print('reset=', dx.Reboot(1, echo=ec)) 
 +   time.sleep(2) 
 + 
 +   print('ping=', dx.Ping(1, echo=ec)) 
 + 
 +   dx.TxPacket(1, dx.INST_READ, (0x2b, 0x01), True) 
 +   dx.RxPacket(True) 
 + 
 +   r = dx.Read(1, 0, 70, echo=ec) 
 +   print(r.hex(',')) 
 +   print('len=',len(r)) 
 + 
 +   for i in range(15): 
 +     led = dx.Read8(1, 25, echo=ec) ^ 1 
 +     dx.Write8(1, 25, led, echo=ec) 
 +     print(f'led={led}', end = '\r') 
 +     time.sleep(0.2) 
 +   print() 
 + 
 +   dx.Write8(1, 24, 0, echo=ec) 
 +   dx.Write16(1, 6, (4095, 4095), echo=ec) 
 + 
 +   for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)): 
 +     dx.Write16(1, 30, gp, echo=ec) 
 +     pp = dx.Read16(1, 36, signed=True, echo=ec) 
 +     print(f'Goal Pos={gp:6d}, Present Pos={pp:6d}', end='\r') 
 +     time.sleep(0.05) 
 +   else: 
 +     print('') 
 + 
 +   dx.Write8(1, 24, 0, echo=ec) 
 +   dx.Write16(1, 6, (0, 4095),echo=ec) 
 + 
 +  machine.freq(250000000) # set the CPU frequency to 250 MHz 
 + 
 +  dx = DXLProtocolV2(baudrate = 3000000) 
 +  loop = asyncio.get_event_loop() 
 +  loop.create_task(test1()) 
 +  loop.create_task(test2()) 
 +  loop.run_forever() 
 + 
 +  time.sleep(1) 
 +  dx = DXLProtocolV1(baudrate = 57600) 
 +  test3() 
 +</pre> 
 +}}


トップ   差分 リロード印刷に適した表示   全ページ一覧 単語検索 最新ページの一覧   最新ページのRSS 1.0 最新ページのRSS 2.0 最新ページのRSS Atom