14: 2024-01-08 (Mon) 13:54:25 takaboo source Cur: 2024-01-09 (Tue) 23:24:03 takaboo source
Line 327: Line 327:
****インストーラのダウンロードとインストール [#gcd418f7] ****インストーラのダウンロードとインストール [#gcd418f7]
-ここではWindows向けの実行プログラムを生成する事が目的なので、[[ここ>GCC Developer Lite#DOWNLOAD]]から「基本パック」と「WINパック」をダウンロードする。手順を間違えるとインストールできないので、[[こちら>GCC Developer Lite#t1d1c731]]を読んでから作業する。+ここではWindows向けの実行プログラムを生成する事が目的なので、[[ここ>GCC Developer Lite#DOWNLOAD]]から「基本パック」と「WIN64パック」をダウンロードする。手順を間違えるとインストールできないので、[[こちら>GCC Developer Lite#t1d1c731]]を読んでから作業する。
****ソースコード編集・ファイル操作 [#m53faff0] ****ソースコード編集・ファイル操作 [#m53faff0]
Line 336: Line 336:
****コンパイルオプション [#l2754b74] ****コンパイルオプション [#l2754b74]
-64ビット版のWindows上で32ビット版のプログラムは実行できるが、その逆はできない。32ビット版と64ビット版のどちらを選んでも大きな違いは感じないと思うが、ここではどちらのOSでも実行できる32ビット版のプログラムを生成する事にする。+Windowsの64bit版を想定したのでWIN64パックをインストールする事としたが、32bitの場合はWIN32パックを選定するまでである。もし32bit版を適用した場合は64bitとある所を32bit、x64とある所をx86に読み替える事。
-コンパイルしたいソースコードを開いている状態で、「ツール(%%%T%%%)」メニュー内の「コンパイルオプション(%%%O%%%)」をクリックしコンパイルオプションダイアログボックを開く。上端の設定リストをドロップダウンし「Windows x86 (Console)」を選んで''OK''ボタンを押すと、32ビット版のコンパイル条件の設定は全て完了する。+コンパイルしたいソースコードを開いている状態で、「ツール(%%%T%%%)」メニュー内の「コンパイルオプション(%%%O%%%)」をクリックしコンパイルオプションダイアログボックを開く。上端の設定リストをドロップダウンし「Windows x64 (Console)」を選んで''OK''ボタンを押すと、32ビット版のコンパイル条件の設定は全て完了する。
#ref(GDL_SelCompileOption.png) #ref(GDL_SelCompileOption.png)
- 
-ちなみに「Windows x64 (Console)」を選ぶと64ビット版の実行ファイルを生成するためのコンパイル条件となる。 
なお後述の[[ビルド>#w2cf95a0]]や[[デバッグ情報付きビルド>#hf8ec1c4]]を行った既存のソースコードファイルを開き直した場合、「保存済みの環境設定ファイルが見つかりました。」というメッセージダイアログボックスが表示される。 なお後述の[[ビルド>#w2cf95a0]]や[[デバッグ情報付きビルド>#hf8ec1c4]]を行った既存のソースコードファイルを開き直した場合、「保存済みの環境設定ファイルが見つかりました。」というメッセージダイアログボックスが表示される。
Line 1193: Line 1191:
****Pythonその2 [#k91ab321] ****Pythonその2 [#k91ab321]
-全てPythonで書いてしまえば、Cで作られたライブラリをリンクしたり、ctypesの有象無象に呵まれる事も無くスッキリする。~ +全てPythonで書いてしまえば、態々Cで作られたライブラリをリンクしたり、ctypesの有象無象に呵まれる事も無くスッキリする。という事でインストラクションパケットの生成と送信、ステータスパケットの受信、インストラクションの種類によってパラメータ部分を結合・分解するクラスを作ってみた。前記の内容とは一線を画すので、互換性は一切考えていない。
-突如気が向いたので、インストラクションパケットの生成と送信、ステータスパケットの受信、インストラクションの種類によってパラメータ部分を結合・分解するクラスを作ってみた。前記の内容とは一線を画すので、互換性は一切考えていない。+
[[pySerial>https://github.com/pyserial/p​yserial]]は必須。~ [[pySerial>https://github.com/pyserial/p​yserial]]は必須。~
-単体で一通りのAPIを試せるようにデモコードを付けておいた。ターゲットはXM/XH/XDシリーズ、IDは1~4(少なくともID=1は必須)、ボーレートは3Mbpsを想定、デバッグ用にpsutilが必要といったところだ。+なお単体で試せるようにデモコードを付けておいた。ターゲットはXM/XH/XDシリーズ、IDは1~5(少なくともID=1は必須)、ボーレートは3Mbpsを想定、デバッグ用にpsutilが必要、マルチスレッドでアクセスといったところだ。プロトコルV1に対応したDynamixelの場合は末尾のコメントになっているコードを利用してもらえればと。~ 
 +一応関数に与えられたパラメータを確認しているが、全領域で検証したものではない。また値の正負については勝手に判断している部分もあるので、都合が悪ければ適宜修正してもらえればと思う。 
 + 
 +[[ソースへの直リンク>https://github.com/mukyokyo/pyDXL/raw/m​ain/pyDXL.py]] 
#html{{ #html{{
-<pre class="brush: python;"  title="dx2lib.py">+<pre class="brush: python;"  title="pyDXL.py">
#!/usr/bin/python3 #!/usr/bin/python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
-# dx2lib.py +# pyDXL.py 
-# (C) 2024 BestTechnology+# SPDX-License-Identifier: MIT 
 +# SPDX-FileCopyrightText: (C) 2024 mukyokyo
-import serial, threading, array, struct+import serial, threading, multiprocessing, array, struct 
 +from typing import Union
from collections import namedtuple from collections import namedtuple
-from struct import pack, unpack+from struct import pack, unpack, iter_unpack 
 + 
 +########################################​################## 
 +# Functionalized the part of converting int to bytes. 
 +# If specified as a tuple, it is converted to bytes at once. 
 +########################################​################## 
 +def B2Bs(d) -> bytes: 
 +  if isinstance(d, list) or isinstance(d, tuple): 
 +   return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d) 
 +  else: 
 +   return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d)) 
 + 
 +def W2Bs(d) -> bytes: 
 +  if isinstance(d, list) or isinstance(d, tuple): 
 +   return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]]) 
 +  else: 
 +   return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d)) 
 + 
 +def L2Bs(d) -> bytes: 
 +  if isinstance(d, list) or isinstance(d, tuple): 
 +   return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]]) 
 +  else: 
 +   return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d)) 
 + 
 +########################################​################## 
 +# API for Dynamixel protocol V1 
 +########################################​################## 
 +class DXLProtocolV1: 
 +  BROADCASTING_ID = 0xfe 
 +  INST_PING          = 0x01 
 +  INST_READ          = 0x02 
 +  INST_WRITE          = 0x03 
 +  INST_REG_WRITE      = 0x04 
 +  INST_ACTION        = 0x05 
 +  INST_FACTORY_RESET  = 0x06 
 +  INST_REBOOT        = 0x08 
 +  INST_SYNC_WRITE    = 0x83 
 +  INST_SYNG_REG_WRITE = 0x85 
 + 
 +  TSyncW = namedtuple("TSyncW", ("id", ("data"))) 
 + 
 +  def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None): 
 +   """ 
 +   Initalize 
 + 
 +   parameters 
 +   ------------- 
 +   port : str 
 +     Device name 
 +   baudrate : int 
 +     Serial baudrate[bps] 
 +   timeout : float 
 +     Read timeout[s] 
 +   """ 
 +   if isinstance(port, serial.Serial): 
 +     self.__serial = port 
 +     self.__baudrate = port.baudrate 
 +     self.__timeout = port.timeout 
 +   else: 
 +     self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout) 
 +     self.__baudrate = self.__serial.baudrate 
 +     self.__timeout = self.__serial.timeout 
 +   if lock == None: 
 +     self.__lock = threading.Lock() 
 +   else: 
 +     self.__lock = lock 
 +   self.__Error = 0 
 + 
 +  @property 
 +  def lock(self): 
 +   return self.__lock 
 + 
 +  @property 
 +  def baudrate(self): 
 +   return self.__serial.baudrate 
 + 
 +  @baudrate.setter 
 +  def baudrate(self, baudrate): 
 +   self.__baudrate = baudrate 
 +   self.__serial.baudrate = baudrate 
 + 
 +  @property 
 +  def timeout(self): 
 +   return self.__serial.timeout 
 + 
 +  @timeout.setter 
 +  def timeout(self, timeout): 
 +   self.__timeout = timeout 
 +   self.__serial.timeout = timeout 
 + 
 +  def __reconfig(self): 
 +   self.__serial.baudrate = self.__baudrate 
 +   self.__serial.timeout = self.__timeout 
 + 
 +  @property 
 +  def Error(self): 
 +   return self.__Error 
 + 
 +  def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool): 
 +   """ 
 +   Sending packets 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   inst : int 
 +     Instruction command 
 +   param : bytes 
 +     Packet parameters 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Packets sent 
 +   bool 
 +     Success or failure 
 +   """ 
 +   self.__reconfig() 
 +   if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 253)) and len(param) <= (256 - 6): 
 +     instp = bytearray([0xff,0xff,id,0,inst]) + bytes(param) 
 +     instp[3] = len(instp) - 3 
 +     instp += B2Bs(~sum(instp[2:]) & 0xff) 
 +     self.__serial.reset_input_buffer() 
 +     if echo: print('TX:', instp.hex(':')) 
 +     self.__serial.write(instp) 
 +     return bytes(instp), True 
 +   return None, False 
 + 
 +  def __rx(self, length) -> bytes: 
 +   s = self.__serial.read(length) 
 +   l = len(s) 
 +   if l == length: 
 +     return s 
 +   else: 
 +     r = s 
 +     length -= l 
 +     if length > 0: 
 +       while self.__serial.in_waiting > 0: 
 +         s = self.__serial.read(length) 
 +         r += s 
 +         length -= len(s) 
 +         if length == 0: 
 +           break 
 +     return r 
 + 
 +  def RxPacket(self, echo = False) -> (bytes, bool): 
 +   """ 
 +   Receiving packets 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Packets received 
 +   bool 
 +     Success or failure 
 +   """ 
 +   statp = self.__rx(5) 
 +   if statp: 
 +     if len(statp) == 5: 
 +       if statp[0] == 0xff and statp[1] == 0xff: 
 +         l = statp[3] - 1 
 +         self.__Error = statp[4] 
 +         statp += self.__rx(l) 
 +         if len(statp) == l + 5: 
 +           if statp[-1:][0] == ((~sum(statp[2:-1])) & 0xff): 
 +             if echo: print('RX:', statp.hex(':')) 
 +             return bytes(statp), (statp[4] & 0x40) == 0 
 +   return None, False 
 + 
 +  def Write(self, id : int, addr : int, data : bytes, echo = False) -> bool: 
 +   """ 
 +   Write instruction 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   addr : int 
 +     Target item address 
 +   data : bytes 
 +     Data to be written 
 + 
 +   Returns 
 +   ------- 
 +   result : bool 
 +     Success or failure 
 +   """ 
 +   with self.__lock: 
 +     if id >= 0 and id <= self.BROADCASTING_ID and addr >= 0  and addr <= 254: 
 +       if self.TxPacket(id, self.INST_WRITE, B2Bs(addr) + data, echo)[1]: 
 +         if id != self.BROADCASTING_ID: 
 +           dat, r = self.RxPacket(echo) 
 +           if r: 
 +             return dat[2] == id and (dat[4] & 0x18) == 0 
 +         else: 
 +           return True 
 +     return False 
 + 
 +  def Write8(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: 
 +   return self.Write(id, addr, B2Bs(data), echo) 
 + 
 +  def Write16(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: 
 +   return self.Write(id, addr, W2Bs(data), echo) 
 + 
 +  def Write32(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: 
 +   return self.Write(id, addr, L2Bs(data), echo) 
 + 
 +  def Read(self, id : int, addr : int, length : int, echo = False) -> bytes: 
 +   """ 
 +   Read instruction 
 + 
 +   parameters 
 +   ------------- 
 +   id : int 
 +     Target ID 
 +   addr : int 
 +     Target item address 
 +   length : int 
 +     Number of bytes to read 
 + 
 +   Returns 
 +   ------- 
 +   bytes 
 +     Data read 
 +   """ 
 +   with self.__lock: 
 +     if id >= 0 and id <= 253 and addr >= 0 and addr <= 254 and length > 0 and length <= (256 - 6): 
 +       if self.TxPacket(id, self.INST_READ, B2Bs((addr, length)), echo)[1]: 
 +         dat, r = self.RxPacket(echo) 
 +         if r: 
 +           if dat[2] == id and (dat[4] & 0x8) == 0: 
 +             return bytes(dat[5:-1]) 
 +     return None 
 + 
 +  def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, length, echo) 
 +   if r != None: 
 +     n = sum(iter_unpack('b' if signed else 'B', r), ()) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, 2 << (length - 1), echo) 
 +   if r != None: 
 +     n = sum(iter_unpack('h' if signed else 'H', r), ()) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
 +   r = self.Read(id, addr, 4 << (length - 1), echo) 
 +   if r != None: 
 +     n = sum(iter_unpack('i' if signed else 'I', r), ()) 
 +     return n if length > 1 else n[0] 
 +   return None 
 + 
 +  def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool: 
 +   """ 
 +   Sync Write instruction 
 + 
 +   parameters 
 +   ------------- 
 +   addr : int 
 +     Target item address 
 +   length : int 
 +     Number of bytes to write 
 +   id_datas : (TSyncW) 
 +     Target ID and data 
 + 
 +   Returns 
 +   ------- 
 +   bool 
 +     Success or failure 
 +   """ 
 +   with self.__lock: 
 +     if addr >= 0 and addr <= 254 and length > 0 and length < (256 - 6): 
 +       param = B2Bs((addr,length)) 
 +       for d in id_datas: 
 +         param += B2Bs(d.id) + d.data 
 +         if len(d.data) != length or d.id < 0 or d.id > 253: 
 +           del param 
 +           return False 
 +       return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1] 
 +     return False 
 + 
 +  def Ping(self, id : int, echo = False) -> bool: 
 +   with self.__lock: 
 +     if self.TxPacket(id, self.INST_PING, bytes(), echo)[1]: 
 +       dat, r = self.RxPacket(echo) 
 +       if r: 
 +         return id == dat[2] and dat[3] == 2 
 +     return False 
 + 
 +  def FactoryReset(self, id : int, echo = False) -> bool: 
 +   with self.__lock: 
 +     if self.TxPacket(id, self.INST_FACTORY_RESET, bytes(), echo)[1]: 
 +       dat, r = self.RxPacket(echo) 
 +       if r: 
 +         return id == dat[2] and dat[3] == 2 
 +     return False 
 + 
 +  def Reboot(self, id : int, echo = False) -> bool: 
 +   with self.__lock: 
 +     if self.TxPacket(id, self.INST_REBOOT, bytes(), echo)[1]: 
 +       dat, r = self.RxPacket(echo) 
 +       if r: 
 +         return id == dat[2] and dat[3] == 2 
 +     return False
-class dx2lib:+########################################​################## 
 +# API for Dynamixel protocol V2 
 +########################################​################## 
 +class DXLProtocolV2:
  BROADCASTING_ID = 0xfe   BROADCASTING_ID = 0xfe
  INST_PING                = 0x01   INST_PING                = 0x01
Line 1216: Line 1529:
  INST_REG_WRITE            = 0x04   INST_REG_WRITE            = 0x04
  INST_ACTION              = 0x05   INST_ACTION              = 0x05
-  INST_RESET                = 0x06+  INST_FACTORY_RESET        = 0x06
  INST_REBOOT              = 0x08   INST_REBOOT              = 0x08
  INST_SYS_WRITE            = 0x0d   INST_SYS_WRITE            = 0x0d
Line 1234: Line 1547:
  TBulkR = namedtuple("TBulkR", ("id", "addr", "length"))   TBulkR = namedtuple("TBulkR", ("id", "addr", "length"))
-  __crc16_lutable = array.array('H',[])+  __crc16_lutable = array.array('H')
-  def __init__(self, port : str, baudrate : int = 57600, timeout : float  = 0.05):+  def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None):
   """    """
   Initalize    Initalize
Line 1249: Line 1562:
     Read timeout[s]      Read timeout[s]
   """    """
-   self.__serial = serial.Serial(port, baudrate, timeout = timeout) +   if isinstance(port, serial.Serial): 
-    self.__baudrate = baudrate +     self.__serial = port 
-    self.__timeout = timeout; +     self.__baudrate = port.baudrate 
-   self.__mutex = threading.Lock()+     self.__timeout = port.timeout 
 +   else: 
 +     self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout) 
 +      self.__baudrate = self.__serial.baudrate 
 +      self.__timeout = self.__serial.timeout 
 +   if lock == None: 
 +     self.__lock = threading.Lock() 
 +   else: 
 +     self.__lock = lock 
 +   self.__Error = 0 
 +   poly = 0x8005
   for i in range(256):    for i in range(256):
     nData = i << 8      nData = i << 8
     nAccum = 0      nAccum = 0
-     poly = 0x8005 
     for j in range(8):      for j in range(8):
-       if ((nData ^ nAccum) & 0x8000): +       nAccum = ((nAccum << 1) ^ poly if (nData ^ nAccum) & 0x8000 else nAccum << 1) & 0xffff
-         nAccum = (nAccum << 1) ^ poly +
-&nbsp;      else+
-         nAccum <<= 1+
       nData <<= 1        nData <<= 1
-     self.__crc16_lutable.append(nAccum & 0xffff)+     self.__crc16_lutable.append(nAccum)
  @property   @property
-  def mutex(self): +  def lock(self): 
-   return self.__mutex+   return self.__lock
  @property   @property
  def baudrate(self):   def baudrate(self):
-   return self.__ser.baudrate+   return self.__serial.baudrate
  @baudrate.setter   @baudrate.setter
Line 1286: Line 1605:
   self.__timeout = timeout    self.__timeout = timeout
   self.__serial.timeout = timeout    self.__serial.timeout = timeout
 +
 +  def __reconfig(self):
 +   self.__serial.baudrate = self.__baudrate
 +   self.__serial.timeout = self.__timeout
 +
 +  @property
 +  def Error(self):
 +   return self.__Error
  def __crc16(self, data : bytes) -> int:   def __crc16(self, data : bytes) -> int:
   crc = 0    crc = 0
   for d in data:    for d in data:
-     crc = (crc << 8) ^ self.__crc16_lutable[(((crc >> 8) ^ d) & 0xff)];+     crc = (crc << 8) ^ self.__crc16_lutable[(((crc >> 8) ^ d) & 0xff)]
   return crc & 0xffff    return crc & 0xffff
Line 1303: Line 1630:
   inst : int    inst : int
     Instruction command      Instruction command
-   param : bytearray+   param : bytes
     Packet parameters      Packet parameters
   Returns    Returns
   -------    -------
-   bytes | None+   bytes
     Packets sent      Packets sent
   bool    bool
     Success or failure      Success or failure
   """    """
-   if (id == self.BROADCASTING_ID) or (id <= 252):+   self.__reconfig() 
 +   if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 252)) and len(param) < (65536 - 10):
     instp = bytearray([0xff,0xff,0xfd,0x00,id,0,0,in​st]) + bytearray(param).replace(b'\xff\xff\xfd'​,b'\xff\xff\xfd\xfd')      instp = bytearray([0xff,0xff,0xfd,0x00,id,0,0,in​st]) + bytearray(param).replace(b'\xff\xff\xfd'​,b'\xff\xff\xfd\xfd')
-     instp[5:7] = bytes(tuple(pack('<H', len(instp) - 5))+     instp[5:7] = W2Bs(len(instp) - 5) 
-     instp += bytes(tuple(pack('<H', self.__crc16(instp))))+     instp += W2Bs(self.__crc16(instp))
     self.__serial.reset_input_buffer()      self.__serial.reset_input_buffer()
-     if echo: print('TX:', instp.hex(','))+     if echo: print('TX:', instp.hex(':'))
     self.__serial.write(instp)      self.__serial.write(instp)
     return bytes(instp), True      return bytes(instp), True
   return None, False    return None, False
 +
 +  def __rx(self, length) -> bytes:
 +   s = self.__serial.read(length)
 +   l = len(s)
 +   if l == length:
 +     return s
 +   else:
 +     r = s
 +     length -= l
 +     if length > 0:
 +       while self.__serial.in_waiting > 0:
 +         s = self.__serial.read(length)
 +         r += s
 +         length -= len(s)
 +         if length == 0:
 +           break
 +     return r
  def RxPacket(self, echo = False) -> (bytes, bool):   def RxPacket(self, echo = False) -> (bytes, bool):
Line 1329: Line 1674:
   Returns    Returns
   -------    -------
-   bytes | None+   bytes
     Packets received      Packets received
   bool    bool
     Success or failure      Success or failure
   """    """
-   statp = self.__serial.read(9)+   self.__serial.flush() 
 +   statp = self.__rx(9)
   if statp:    if statp:
     if len(statp) == 9:      if len(statp) == 9:
-       l = unpack('<H', statp[5:7])[0] - 2 +       if statp[0] == 0xff and statp[1] == 0xff and statp[2] == 0xfd and statp[3] == 0 and statp[7] == 0x55: 
-        statp += self.__serial.read(l) +         l = unpack('<H', statp[5:7])[0] - 2 
-        if len(statp) == l + 9: +          self.__Error = statp[8] 
-          if unpack('<H', statp[-2:])[0] == self.__crc16(statp[:-2]): +         statp += self.__rx(l) 
-            statp = statp[0:9] + statp[9:].replace(b'\xff\xff\xfd\xfd',b'​\xff\xff\xfd') +          if len(statp) == l + 9: 
-            if echo: print('RX:', statp.hex(',')) +            if unpack('<H', statp[-2:])[0] == self.__crc16(statp[:-2]): 
-            return bytes(statp), True+              statp = statp[0:9] + statp[9:].replace(b'\xff\xff\xfd\xfd',b'​\xff\xff\xfd') 
 +              if echo: print('RX:', statp.hex(':')) 
 +              return bytes(statp), (statp[8] & 0x7f) == 0
   return None, False    return None, False
-  def WriteBlockData(self, id : int, addr : int, data : bytes, echo = False) -> bool:+  def Write(self, id : int, addr : int, data : bytes, echo = False) -> bool:
   """    """
   Write instruction    Write instruction
Line 1364: Line 1712:
     Success or failure      Success or failure
   """    """
-   with self.__mutex+   with self.__lock
-     if (id <= 252) or (id == self.BROADCASTING_ID): +     if ((id >= 0 and id <= 252) or (id == self.BROADCASTING_ID)) and addr >= 0 and addr <= 65535
-       _, r = self.TxPacket(id, self.INST_WRITE, memoryview(pack('<H', addr) + data), echo) +       if self.TxPacket(id, self.INST_WRITE, W2Bs(addr) + data, echo)[1]: 
-        if r+          if id != self.BROADCASTING_ID
-          d, r = self.RxPacket(echo) +            dat, r = self.RxPacket(echo) 
-          return r and (d[4] == id)+            if r
 +             return dat[4] == id and (dat[8] & 0x7f) == 0 
 +         else: 
 +           return True
     return False      return False
-  def WriteByteData(self, id : int, addr : int, data : int, echo = False) -> bool: +  def Write8(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: 
-   return self.WriteBlockData(id, addr, bytes((data,)), echo)+   return self.Write(id, addr, B2Bs(data), echo)
-  def WriteWordData(self, id : int, addr : int, data : int, echo = False) -> bool: +  def Write16(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: 
-   return self.WriteBlockData(id, addr, memoryview(bytes(tuple(pack('<H', data)))), echo)+   return self.Write(id, addr, W2Bs(data), echo)
-  def WriteLongData(self, id : int, addr : int, data : int, echo = False) -> bool: +  def Write32(self, id : int, addr : int, data : Union[int, tuple, list], echo = False) -> bool: 
-   return self.WriteBlockData(id, addr, memoryview(bytes(tuple(pack('<l', data)))), echo)+   return self.Write(id, addr, L2Bs(data), echo)
-  def ReadBlockData(self, id : int, addr : int, length : int, echo = False) -> (bytes, bool):+  def Read(self, id : int, addr : int, length : int, echo = False) -> bytes:
   """    """
   Read instruction    Read instruction
Line 1396: Line 1747:
   Returns    Returns
   -------    -------
-   bytes | None+   bytes
     Data read      Data read
   bool    bool
     Success or failure      Success or failure
   """    """
-   with self.__mutex+   with self.__lock
-     if (id <= 252) and (length > 0): +     if id >= 0 and id <= 252 and addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10): 
-       _, r = self.TxPacket(id, self.INST_READ, memoryview(bytes(list(pack('<H', addr))) + bytes(list(pack('<H', length)))), echo) +       if self.TxPacket(id, self.INST_READ, W2Bs(addr) + W2Bs(length), echo)[1]:
-       if r:+
         dat, r = self.RxPacket(echo)          dat, r = self.RxPacket(echo)
         if r:          if r:
-           return bytes(dat[9:-2]), (id == dat[4]) +           return bytes(dat[9:-2]) 
-     return None, False+     return None
-  def ReadByteData(self, id : int, addr : int, echo = False) -> int: +  def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
-   dat, r = self.ReadBlockData(id, addr, 1, echo) +   r = self.Read(id, addr, length, echo) 
-   if r: +   if r != None
-     return int(dat[0])+     n = sum(iter_unpack('b' if signed else 'B', r), ()) 
 +     return n if length > 1 else n[0]
   return None    return None
-  def ReadWordData(self, id : int, addr : int, echo = False) -> int: +  def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
-   dat, r = self.ReadBlockData(id, addr, 2, echo) +   r = self.Read(id, addr, 2 << (length - 1), echo) 
-   if r: +   if r != None
-     return unpack('<H', dat[0:2])[0]+     n = sum(iter_unpack('h' if signed else 'H', r), ()) 
 +     return n if length > 1 else n[0]
   return None    return None
-  def ReadLongData(self, id : int, addr : int, echo = False) -> int: +  def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: 
-   dat, r = self.ReadBlockData(id, addr, 4, echo) +   r = self.Read(id, addr, 4 << (length - 1), echo) 
-   if r: +   if r != None
-     return unpack('<l', dat[0:4])[0]+     n = sum(iter_unpack('i' if signed else 'I', r), ()) 
 +     return n if length > 1 else n[0]
   return None    return None
-  def WriteSyncData(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool: +  def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool: 
-   with self.__mutex+   """ 
-     param = bytes(pack('<H', addr)) + bytes(pack('<H', length)+   Sync Write instruction 
-      for d in id_datas: + 
-        param += bytes((d.id,)) + d.data +   parameters 
-      _, r = self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, memoryview(param), echo) +   ------------- 
-     del param +   addr : int 
-     return r+     Target item address 
 +   length : int 
 +     Number of bytes to write 
 +   id_datas : (TSyncW) 
 +     Target ID and data 
 + 
 +   Returns 
 +   ------- 
 +   bool 
 +     Success or failure 
 +   """ 
 +   with self.__lock
 +     if addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10): 
 +       param = W2Bs(addr) + W2Bs(length) 
 +        for d in id_datas: 
 +          param += bytes((d.id,)) + d.data 
 +          if len(d.data) != length or d.id < 0 or d.id > 252: 
 +           del param 
 +           return False 
 +       return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1] 
 +     return False
-  def ReadSyncData(self, addr : int, length : int, ids : (int), echo = False) -> tuple:+  def SyncRead(self, addr : int, length : int, ids : (int), echo = False) -> tuple:
   """    """
   Sync Read instruction    Sync Read instruction
Line 1447: Line 1820:
   length : int    length : int
     Number of bytes to read      Number of bytes to read
-   ids : Tuple[int] +   ids : (int) 
-     Data to be written+     Target IDs
   Returns    Returns
   -------    -------
-   bytes | None +   tuple 
-     Data read +     Read ID and data
-   bool +
-     Success or failure+
   """    """
-   with self.__mutex:+   with self.__lock:
     result = ()      result = ()
-     _, r = self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_READ, memoryview(bytes(pack('<H', addr)) + bytes(pack('<H', length)) + bytes(ids)), echo) +     if addr >= 0 and addr < 65535 and length > 0 and length < (65536 - 10): 
-     if r+       if self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_READ, W2Bs(addr) + W2Bs(length) + B2Bs(ids), echo)[1]
-        for id in ids: +          for id in ids: 
-          dat, r = self.RxPacket(echo) +            dat, r = self.RxPacket(echo) 
-          if r: +            if r: 
-            result += (id, bytes(dat[9:9 + length])), +              if dat[4] == id: 
-          else: +               result += (id, bytes(dat[9:9 + length])), 
-           result += (id, bytes([])),+              else: 
 +               result += (id, bytes([])), 
 +           else: 
 +             result += (id, bytes([])),
     return result      return result
-   return None 
-  def WriteBulkData(self, data : (TBulkW), echo = False) -> bool: +  def BulkWrite(self, data : (TBulkW), echo = False) -> bool: 
-   with self.__mutex:+   """ 
 +   Bulk Write instruction 
 + 
 +   parameters 
 +   ------------- 
 +   data : (TBulkW) 
 +     Target ID and address, data 
 +   ids : (TSyncW) 
 +     Target ID and data 
 + 
 +   Returns 
 +   ------- 
 +   bool 
 +     Success or failure 
 +   """ 
 +   with self.__lock:
     param = bytes()      param = bytes()
     for d in data:      for d in data:
-       param += pack('B', d.id) + pack('<H', d.addr) + pack('<H', len(d.data)) + d.data +       if d.id >= 0 and d.id <= 252 and d.addr >= 0 and d.addr <= 65535: 
-     _, r = self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_WRITE, memoryview(param), echo) +         param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(len(d.data)) + d.data 
-     del param +       else: 
-     return r+         del param 
 +         return False 
 +     return self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_WRITE, param, echo)[1]
-  def ReadBulkData(self, data:(TBulkR), echo = False) -> tuple: +  def BulkRead(self, data:(TBulkR), echo = False) -> tuple: 
-   with self.__mutex:+   """ 
 +   Bulk Read instruction 
 + 
 +   parameters 
 +   ------------- 
 +   data : (TBulkR) 
 +     ID, address, and number of bytes to be read 
 + 
 +   Returns 
 +   ------- 
 +   tuple 
 +     Read ID and data 
 +   """ 
 +   with self.__lock:
     result = ()      result = ()
     param = bytes()      param = bytes()
     for d in data:      for d in data:
-       param += bytes(pack('B', d.id)) + bytes(pack('<H', d.addr)) + bytes(pack('<H', d.length)+       if d.id < 0 or d.addr < 0 or d.length < 0: 
-     _, r = self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_READ, memoryview(param), echo) +         return result 
-     del param +       param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(d.length) 
-     if r:+     if self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_READ, param, echo)[1]:
       for d in data:        for d in data:
         rxd, r = self.RxPacket(echo)          rxd, r = self.RxPacket(echo)
Line 1497: Line 1900:
         else:          else:
           result += (d.id, bytes([])),            result += (d.id, bytes([])),
 +     del param
     return result      return result
  def Ping(self, id : int, echo = False) -> bool:   def Ping(self, id : int, echo = False) -> bool:
-   with self.__mutex+   with self.__lock
-     _, r = self.TxPacket(id, self.INST_PING, bytes(), echo) +     if self.TxPacket(id, self.INST_PING, bytes(), echo)[1]:
-     if r:+
       rxd, r = self.RxPacket(echo)        rxd, r = self.RxPacket(echo)
-       return r and (id == rxd[4])+       if r
 +         return id == rxd[4] and rxd[5] == 7 and rxd[6] == 0
     return False      return False
-  def Reset(self, id : int, echo = False) -> bool: +  def FactoryReset(self, id : int, p1 : int, echo = False) -> bool: 
-   with self.__mutex+   with self.__lock
-     _, r = self.TxPacket(id, self.INST_RESET, bytes(0xff), echo) +     if self.TxPacket(id, self.INST_FACTORY_RESET, bytes((p1,)), echo)[1]
-     if r+       rxd, r = self.RxPacket(echo) 
-       dat, r = self.RxPacket(echo) +       if r
-       return r and (id == dat[4]) and ((dat[8] & 0x7f) == 0)+         return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0
     return False      return False
  def Reboot(self, id : int, echo = False) -> bool:   def Reboot(self, id : int, echo = False) -> bool:
-   with self.__mutex+   with self.__lock
-     _, r = self.TxPacket(id, self.INST_REBOOT, bytes(), echo) +     if self.TxPacket(id, self.INST_REBOOT, bytes(), echo)[1]
-     if r+       rxd, r = self.RxPacket(echo) 
-       dat, r = self.RxPacket(echo) +       if r: 
-       return r and (id == dat[4]) and ((dat[8] & 0x7f) == 0)+         return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0
     return False      return False
-  def B2B(self, d): +  def Clear(self, id : int, val, echo = False) -&gt; bool
-   if isinstance(d, list) or isinstance(d, tuple): +   with self.__lock
-     r = () +     if self.TxPacket(id, self.INST_CLEAR, B2Bs(1) + L2Bs(val), echo)[1]
-&nbsp;     for d in d: +        rxd, r = self.RxPacket(echo
-       r += tuple(pack('<B', d)) +       if r: 
-     return bytes(r) +          return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0 
-   else+     return False
-     return bytes(pack('<B', d)) +
- +
-  def W2B(self, d)+
-   if isinstance(d, list) or isinstance(d, tuple): +
-      r = () +
-     for d in d: +
-       r += tuple(pack('<H', d)) +
-     return bytes(r) +
-   else+
-      return bytes(pack('<H', d)) +
- +
-  def L2B(self, d): +
-   if isinstance(d, list) or isinstance(d, tuple): +
-     r = () +
-     for d in d: +
-       r += tuple(pack('<I', d)) +
-     return bytes(r) +
-   else: +
-     return bytes(pack('<I', d))+
 +########################################​##################
 +# test code
 +########################################​##################
if __name__ == "__main__": if __name__ == "__main__":
  from contextlib import contextmanager   from contextlib import contextmanager
Line 1555: Line 1943:
  import time, gc, psutil, os   import time, gc, psutil, os
- +  ec = False
-  #dx = dx2lib('/dev/ttySC2', 1000000) +
-  dx = dx2lib('\\\\.\\COM12', 3000000) +
-  echo = False+
  ID = 1   ID = 1
  fin = False   fin = False
Line 1566: Line 1951:
   start_time = time.time()    start_time = time.time()
   yield    yield
-   print('..proc time={:.1f}ms {}'.format((time.time() - start_time)*1000, psutil.Process(os.getpid()).memory_info(​).rss))+   #print('..proc time={:.1f}ms {}'.format((time.time() - start_time) * 1000, psutil.Process(os.getpid()).memory_info(​).rss))
-  def func1(): +  def func1(dx): 
-   global dx, ID, fin, echo+   global ID, fin, ec
   try:    try:
Line 1575: Line 1960:
     """ reset dxl """      """ reset dxl """
     with stopwatch():      with stopwatch():
-       r = dx.Reset(ID, echo) +       r = dx.FactoryReset(ID, 0xff, echo = ec
-     print(f'Reset({ID}): {r}')+     print(f'FactoryReset({ID}): {r}')
     time.sleep(0.5)      time.sleep(0.5)
     '''      '''
 + 
     """ ping dxl """      """ ping dxl """
-     print(f'Ping({ID}):', dx.Ping(ID, echo))+     print(f'Ping({ID})=', dx.Ping(ID, echo = ec))
     """ reboot dxl """      """ reboot dxl """
     with stopwatch():      with stopwatch():
-       r = dx.Reboot(ID, echo) +       r = dx.Reboot(ID, echo = ec
-     print(f'Reboot({ID}): {r}')+     print(f'Reboot({ID})={r}')
     time.sleep(0.5)      time.sleep(0.5)
Line 1594: Line 1979:
         r0 = dx.TxPacket(ID, dx.INST_WRITE, (65, 0, 1))          r0 = dx.TxPacket(ID, dx.INST_WRITE, (65, 0, 1))
         r1 = dx.RxPacket()          r1 = dx.RxPacket()
-     print(f'TxPacket({ID}): {r0[1]}', r0[0].hex(',')) +     print(f'TxPacket({ID})={r0[1]}', r0[0].hex(':')) 
-     print(f'RxPacket({ID}): {r1[1]}', r1[0].hex(','))+     if r1[0]: 
 +       print(f'RxPacket({ID})={r1[1]}', r1[0].hex(':')) 
 + 
 +     """ dump memory """ 
 +     l = 50 
 +     for addr in range(0, 700, l): 
 +       r = dx.Read(ID, addr, l, echo = ec) 
 +       print(f'Read({addr};{l})=',r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!')
     """ read byte item """      """ read byte item """
     with stopwatch():      with stopwatch():
-       r = dx.ReadByteData(ID, 65, echo) +       r = dx.Read8(ID, 65, echo = ec
-     print(f'ReadByteData({ID}): {r}')+     print(f'Read8({ID})={r}')
     """ sync read inst. """      """ sync read inst. """
-     print('ReadSyncData:') +     print('SyncRead=') 
-     d = dx.ReadSyncData(0, 4, (1, 2, 3), True)+     with stopwatch(): 
 +       d = dx.SyncRead(0, 4, (1, 2, 3, 4, 5), echo = ec)
     for d in d:      for d in d:
-       print(f' ({d[0]})', d[1].hex(','))+       print(f' ({d[0]}) 0,4 hex=', d[1].hex(':') if len(d[1]) > 0 else ()) 
 +     with stopwatch(): 
 +       d = dx.SyncRead(132, 4, (1, 2, 3, 4, 5), echo = ec) 
 +     for d in d: 
 +       print(f' ({d[0]}) 132,4 int32=', unpack('<i', pack('<I', unpack('<I', d[1])[0]))[0] if len(d[1]) > 0 else ())
     """ sync write inst. """      """ sync write inst. """
-     time.sleep(2) +     for i in range(30):
-     for i in range(20):+
       with stopwatch():        with stopwatch():
-         dx.WriteSyncData(65, 1, (dx.TSyncW(1,dx.B2B(1)),dx.TSyncW(2,dx.B2B(1)),dx.TSyncW(3,dx.B2B(1))), echo)+         dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(1)), dx.TSyncW(2, B2Bs(1)), dx.TSyncW(3, B2Bs(1)), dx.TSyncW(4, B2Bs(1)), dx.TSyncW(5, B2Bs(1))), echo = ec)
       time.sleep(0.05)        time.sleep(0.05)
       with stopwatch():        with stopwatch():
-         dx.WriteSyncData(65, 1, (dx.TSyncW(1,dx.B2B(0)),dx.TSyncW(2,dx.B2B(0)),dx.TSyncW(3,dx.B2B(0))), echo)+         dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(0)), dx.TSyncW(2, B2Bs(0)), dx.TSyncW(3, B2Bs(0)), dx.TSyncW(4, B2Bs(0)), dx.TSyncW(5, B2Bs(0))), echo = ec)
       time.sleep(0.05)        time.sleep(0.05)
     """ set goal position """      """ set goal position """
     #torque off      #torque off
-     if dx.WriteByteData(ID, 64, 0, echo):+     if dx.Write8(ID, 64, 0, echo = ec):
       #multi turn        #multi turn
-       if dx.WriteByteData(ID, 11, 4, echo):+       if dx.Write8(ID, 11, 4, echo = ec):
         #torque on          #torque on
-         if dx.WriteByteData(ID, 64, 1, echo): +         if dx.Write8(ID, 64, 1, echo = ec): 
-           #set goal position+           print(f'Write32/Read32({ID})')
           for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)):            for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)):
-             if dx.WriteLongData(ID, 116, gp, echo):+             for i in range(10): 
 +               #set goal position 
 +               with stopwatch(): 
 +                 dx.Write32(ID, 116, gp, echo = ec)
               #get present position                #get present position
               with stopwatch():                with stopwatch():
-                 pp = unpack('>l', pack('>l', dx.ReadLongData(ID, 132, echo)))[0] +                 pp = dx.Read32(ID, 132, signed = True, echo = ec
-               print(f'{gp:6} {pp:6}') +               if pp != None: 
-               time.sleep(0.05)+                 print(f' w={gp:6} r={pp:6} diff={gp-pp:5}', end='\r') 
 +               else: 
 +                 pass 
 +                 print('None                            ', end='\r') 
 +                 break 
 +               time.sleep(0.01)
             else:              else:
-               break+               continue 
 +             pass 
 +             break 
 +           print('')
           #torque off            #torque off
-           dx.WriteByteData(ID, 64, 0, echo)+           with stopwatch(): 
 +             dx.Write8(ID, 64, 0, echo = ec)
           #normal turn            #normal turn
-           dx.WriteByteData(ID, 11, 3, echo+           with stopwatch(): 
-           print('')+             dx.Write8(ID, 11, 3, echo = ec)
-     """ test add/remove suffixes """ +     """ block read/write (add/remove suffixes) """ 
-     fffffd = array.array('B',(+     with stopwatch(): 
 +       r = dx.Read(ID, 120, 27, echo = ec) 
 +     if r: 
 +       print(f'Read({ID};128)=', tuple(iter_unpack('<HBBhhiiIIHB', r))[0]) 
 + 
 +     fffffd = bytes((
       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,        0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,        0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,        0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
-      ))+      ))
     with stopwatch():      with stopwatch():
-       r = dx.WriteBlockData(ID, 634, fffffd, echo) +       r = dx.Write(ID, 634, fffffd, echo = ec
-     print(f'WriteBlockData({ID}): {r}')+     print(f'Write({ID})={r}')
     with stopwatch():      with stopwatch():
-       r = dx.ReadBlockData(ID, 634, len(fffffd), echo) +       r = dx.Read(ID, 634, len(fffffd), echo = ec
-     print(f'ReadBlockData({ID}): {r[1]}', r[0].hex(','))+     if r: 
 +       print(f'Read({ID})=', r.hex(':'))
     """ bulk read inst. """      """ bulk read inst. """
     with stopwatch():      with stopwatch():
-       d = dx.ReadBulkData((dx.TBulkR(1,0,10), dx.TBulkR(2,0,20), dx.TBulkR(3,0,30), dx.TBulkR(4,0,40)), echo)+       d = dx.BulkRead((dx.TBulkR(1,0,10), dx.TBulkR(2,0,20), dx.TBulkR(3,0,30), dx.TBulkR(4,0,40), dx.TBulkR(5,0,40)), echo = ec) 
 +     print('BulkRead=')
     for d in d:      for d in d:
-       print(f' ({d[0]},0)', d[1].hex(','))+       print(f' ({d[0]},0)', d[1].hex(':'))
 +     dx.Write8(ID, 64, 1, echo = ec)
     """ bulk write inst. """      """ bulk write inst. """
-     dx.WriteByteData(ID, 64, 1, echo)+     print('BulkWrite=')
     with stopwatch():      with stopwatch():
-       dx.WriteBulkData((dx.TBulkW(1,104, dx.L2B((0,0,0,1024))), dx.TBulkW(2,65,dx.B2B(0)), dx.TBulkW(3,65,dx.B2B(0))), echo)+       print(' 1', dx.BulkWrite((dx.TBulkW(1,104, L2Bs((0,0,0,1024))), dx.TBulkW(2,65,B2Bs(0)), dx.TBulkW(3,65,B2Bs(0)), dx.TBulkW(4,65,B2Bs(0))), echo = ec))
     time.sleep(0.5)      time.sleep(0.5)
     with stopwatch():      with stopwatch():
-       dx.WriteBulkData((dx.TBulkW(1,104, dx.L2B(0)+dx.L2B(0)+dx.L2B(0)+dx.L2B(2048)), dx.TBulkW(2,65,dx.B2B(1)), dx.TBulkW(3,65,dx.B2B(1))), echo)+       print(' 2', dx.BulkWrite((dx.TBulkW(1,104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(2048)), dx.TBulkW(2,65,B2Bs(1)), dx.TBulkW(3,65,B2Bs(1)), dx.TBulkW(4,65,B2Bs(1))), echo = ec))
     time.sleep(0.5)      time.sleep(0.5)
     with stopwatch():      with stopwatch():
-       dx.WriteBulkData((dx.TBulkW(1,104, dx.L2B(0)+dx.L2B(0)+dx.L2B(0)+dx.L2B(1024)), dx.TBulkW(2,65,dx.B2B(0)), dx.TBulkW(3,65,dx.B2B(0))), echo)+       print(' 3', dx.BulkWrite((dx.TBulkW(1,104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(1024)), dx.TBulkW(2,65,B2Bs(0)), dx.TBulkW(3,65,B2Bs(0))), echo = ec))
     time.sleep(0.5)      time.sleep(0.5)
     with stopwatch():      with stopwatch():
-       dx.WriteBulkData((dx.TBulkW(1,116, dx.L2B(2048)), dx.TBulkW(2,65,dx.B2B(1)), dx.TBulkW(3,65,dx.B2B(1))), echo)+       print(' 4', dx.BulkWrite((dx.TBulkW(1,116, L2Bs(2048)), dx.TBulkW(2,65,B2Bs(1)), dx.TBulkW(3,65,B2Bs(1))), echo = ec))
     time.sleep(0.5)      time.sleep(0.5)
     with stopwatch():      with stopwatch():
-       dx.WriteBulkData((dx.TBulkW(1,65,bytes(0)),dx.TBulkW(2,6​5,bytes(0)),dx.TBulkW(3,65,bytes(0))), echo) +       print(' 5', dx.BulkWrite((dx.TBulkW(1,65,bytes(0)),dx.TBulkW(2,6​5,bytes(0)),dx.TBulkW(3,65,bytes(0))), echo = ec)
-     dx.WriteByteData(ID, 64, 0, echo)+ 
 +     dx.Write8(ID, 64, 0, echo = ec)
   except Exception as ex:    except Exception as ex:
 +     pass
     import traceback      import traceback
     print('--- Caught Exception ---')      print('--- Caught Exception ---')
     traceback.print_exc()      traceback.print_exc()
     print('------------------------')      print('------------------------')
 +     '''
 +     trace = []
 +     tb = ex.__traceback__
 +     while tb is not None:
 +       trace.append({
 +         "filename": tb.tb_frame.f_code.co_filename,
 +         "name": tb.tb_frame.f_code.co_name,
 +         "lineno": tb.tb_lineno
 +       })
 +       tb = tb.tb_next
 +     print(str({
 +       'type': type(ex).__name__,
 +       'message': str(ex),
 +       'trace': trace
 +     }))
 +     '''
   fin = True    fin = True
-  def func2(): +  def func2(dx): 
-   global dx, ID, fin, echo+   global ID, fin, ec
-   time.sleep(1) 
   while not fin:    while not fin:
     try:      try:
-       dx.WriteByteData(ID, 65, dx.ReadByteData(ID, 65, echo) ^ 1, echo)+       l = dx.Read8(ID, 65, echo = ec) 
 +       if l != None: 
 +         l ^= 1 
 +         dx.Write8(ID, 65, l, echo = ec)
       time.sleep(0.05)        time.sleep(0.05)
     except Exception as ex:      except Exception as ex:
Line 1701: Line 2137:
       return        return
-  th1 = Thread(target=func1) +  def func3(dx): 
-  th2 = Thread(target=func2) +   global ID, fin, ec 
-  th1.start() + 
-  th2.start() +   try: 
-  th1.join() +     """ dump memory """ 
-  th2.join()+     l = 20 
 +     for addr in range(0, 250, l): 
 +       r = dx.Read(ID, addr, l, echo = ec) 
 +       print(f'Read({addr};{l})=',r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!') 
 + 
 +     ''' 
 +     """ reset dxl """ 
 +     with stopwatch(): 
 +       r = dx.FactoryReset(ID, echo = ec) 
 +     print(f'FactoryReset({ID}): {r}') 
 +     time.sleep(0.5) 
 +     ''' 
 + 
 +     """ ping dxl """ 
 +     print(f'Ping({ID})=', dx.Ping(ID, echo = ec)) 
 + 
 +     """ reboot dxl """ 
 +     with stopwatch(): 
 +       r = dx.Reboot(ID, echo = ec) 
 +     print(f'Reboot({ID})={r}') 
 +     time.sleep(1) 
 + 
 +     """ basic packet proc """ 
 +     with stopwatch(): 
 +       with dx.mutex: 
 +         r0 = dx.TxPacket(ID, dx.INST_WRITE, (25, 1)) 
 +         r1 = dx.RxPacket() 
 +     print(f'TxPacket({ID})={r0[1]}', r0[0].hex(':')) 
 +     if r1[0]: 
 +       print(f'RxPacket({ID})={r1[1]}', r1[0].hex(':')) 
 + 
 +     """ read byte item """ 
 +     with stopwatch(): 
 +       r = dx.Read8(ID, 25, echo = ec) 
 +     print(f'Read8({ID})={r}') 
 + 
 +     """ sync write inst. """ 
 +     for i in range(30): 
 +       with stopwatch(): 
 +         dx.SyncWrite(25, 1, (dx.TSyncW(1, B2Bs(1)), dx.TSyncW(2, B2Bs(1)), dx.TSyncW(3, B2Bs(1))), echo = ec) 
 +       time.sleep(0.05) 
 +       with stopwatch(): 
 +         dx.SyncWrite(25, 1, (dx.TSyncW(1, B2Bs(0)), dx.TSyncW(2, B2Bs(0)), dx.TSyncW(3, B2Bs(0))), echo = ec) 
 +       time.sleep(0.05) 
 + 
 +     """ set goal position """ 
 +     #torque off 
 +     if dx.Write8(ID, 24, 0, echo = ec): 
 +       #multi turn 
 +       if dx.Write16(ID, 6, (4095, 4095), echo = ec): 
 +         #torque on 
 +         if dx.Write8(ID, 24, 1, echo = ec): 
 +           print(f'Write16/Read16({ID})') 
 +           for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)): 
 +             for i in range(10): 
 +               #set goal position 
 +               with stopwatch(): 
 +                 dx.Write16(ID, 30, gp, echo = ec) 
 +               #get present position 
 +               with stopwatch(): 
 +                 pp = dx.Read16(ID, 36, signed = True, echo = ec) 
 +               if pp != None: 
 +                 print(f' w={gp:6} r={pp:6} diff={gp-pp:5}', end='\r') 
 +               else: 
 +                 pass 
 +                 print('None                            ', end='\r') 
 +                 break 
 +               time.sleep(0.005) 
 +             else: 
 +               continue 
 +             pass 
 +             break 
 +           print('') 
 +           #torque off 
 +           dx.Write8(ID, 24, 0, echo = ec) 
 +           #normal turn 
 +           with stopwatch(): 
 +             dx.Write16(ID, 6, (0, 4095), echo = ec) 
 +   except Exception as ex: 
 +     pass 
 +     import traceback 
 +     print('--- Caught Exception ---') 
 +     traceback.print_exc() 
 +     print('------------------------') 
 + 
 +  import sys 
 +  import platform 
 +  print(sys.version) 
 + 
 +  try: 
 +   dx = DXLProtocolV2('\\\\.\\COM12', 3000000) 
 +  except: 
 +   pass 
 +  else: 
 +   th1 = Thread(target = func1, args = (dx,)
 +    th2 = Thread(target = func2, args = (dx,)
 +    th1.start() 
 +    th2.start() 
 +    th1.join() 
 +    th2.join() 
 +   del th1, th2, dx 
 + 
 +  try: 
 +   dx = DXLProtocolV1('\\\\.\\COM12', 57600) 
 +  except: 
 +   dx = None 
 +  else: 
 +   th3 = Thread(target = func3, args = (dx,)) 
 +   th3.start() 
 +   th3.join() 
 +   del th3, dx 
  print('fin')   print('fin')
 +</pre>
 +}}
 +
 +****Pythonその3 [#v9d3114a]
 +[[Python2>#k91ab321]]ついでに[[MicroPython>https://micropython.org/]]​の互換性レベルを図ってみた。~
 +欲しいライブラリが用意されていない以上に、基本的な部分が結構緩い。また実際に動かすとそれなりに遅い。それでもMicroPythonは存在しているし、個人的にはかつてのBASICの感覚が蘇ってきたので、ちょっとした使い方であれば選択肢に入れても良いかなと。~
 +ひとまず[[PicoSHIELD]]+[[Raspberry Pi Pico>https://micropython.org/download/RP​I_PICO/]]上で動くように修正してみた。オーバーロックしているのはご愛敬として、マルチスレッドを除外した程度で実質PC版と大差無い。@micropython.viperを多用した時はかなり高速化できていたのだが、ここはコードの互換性を優先。
 +#html{{
 +<pre class="brush: python;"  title="pyDXL.py">
 +# pyDXL.py for MicroPython(RP2040)
 +# SPDX-License-Identifier: MIT
 +# SPDX-FileCopyrightText: (C) 2024 mukyokyo
 +
 +
 +from machine import UART, Pin
 +import time, array
 +from collections import namedtuple
 +from struct import pack, unpack
 +
 +########################################​##################
 +# Functionalized the part of converting int to bytes.
 +# If specified as a tuple, it is converted to bytes at once.
 +########################################​##################
 +@micropython.native
 +def B2Bs(d) -> bytes:
 +  if isinstance(d, list) or isinstance(d, tuple):
 +   return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d)
 +  else:
 +   return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d))
 +
 +@micropython.native
 +def W2Bs(d) -> bytes:
 +  if isinstance(d, list) or isinstance(d, tuple):
 +   return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]])
 +  else:
 +   return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d))
 +
 +@micropython.native
 +def L2Bs(d) -> bytes:
 +  if isinstance(d, list) or isinstance(d, tuple):
 +   return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]])
 +  else:
 +   return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d))
 +
 +########################################​##################
 +# API for Dynamixel protocol V1
 +########################################​##################
 +class DXLProtocolV1:
 +  BROADCASTING_ID = 0xfe
 +  INST_PING          = 0x01
 +  INST_READ          = 0x02
 +  INST_WRITE          = 0x03
 +  INST_REG_WRITE      = 0x04
 +  INST_ACTION        = 0x05
 +  INST_FACTORY_RESET  = 0x06
 +  INST_REBOOT        = 0x08
 +  INST_SYNC_WRITE    = 0x83
 +  INST_SYNG_REG_WRITE = 0x85
 +
 +  TSyncW = namedtuple("TSyncW", ("id", ("data")))
 +
 +  def __init__(self, baudrate = 57600, timeout_ms = 50):
 +   """
 +   Initalize
 +
 +   parameters
 +   -------------
 +   port : int
 +     UART no.
 +   baudrate : int
 +     Serial baudrate[bps]
 +   timeout : float
 +     Read timeout[s]
 +   """
 +   self.__Error = 0
 +   self.__baudrate = baudrate
 +   self.__timeout = timeout_ms
 +   self.__serial = UART(1, baudrate, tx = Pin(4), rx = Pin(5), txbuf = 500, rxbuf = 500, timeout = timeout_ms)
 +
 +  @property
 +  def baudrate(self):
 +   return self.__baudrate
 +
 +  @baudrate.setter
 +  def baudrate(self, baudrate):
 +   self.__baudrate = baudrate
 +   self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout)
 +
 +  @property
 +  def timeout(self):
 +   return self.__timeout
 +
 +  @timeout.setter
 +  def timeout(self, timeout_ms):
 +   self.__timeout = timeout_ms
 +   self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout)
 +
 +  @property
 +  def Error(self):
 +   return self.__Error
 +
 +  @micropython.native
 +  def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool):
 +   """
 +   Sending packets
 +
 +   parameters
 +   -------------
 +   id : int
 +     Target ID
 +   inst : int
 +     Instruction command
 +   param : bytes
 +     Packet parameters
 +
 +   Returns
 +   -------
 +   bytes
 +     Packets sent
 +   bool
 +     Success or failure
 +   """
 +   if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 253)) and len(param) <= (256 - 6):
 +     instp = bytearray([0xff,0xff,id,0,inst]) + bytes(param)
 +     instp[3] = len(instp) - 3
 +     instp += B2Bs(~sum(instp[2:]) & 0xff)
 +     _ = self.__serial.read(self.__serial.any())
 +     if echo: print('TX:', instp.hex(':'))
 +     self.__serial.write(instp)
 +     return bytes(instp), True
 +   return None, False
 +
 +  @micropython.native
 +  def __rx(self, length) -> bytes:
 +   s = self.__serial.read(length)
 +   if s:
 +     l = len(s)
 +     if l == length:
 +       return s
 +     else:
 +       r = s
 +       length -= l
 +       while self.__serial.any() > 0:
 +         s = self.__serial.read(length)
 +         r += s
 +         length -= len(s)
 +         if length == 0:
 +           break
 +       return r
 +   else:
 +     return bytes()
 +
 +  @micropython.native
 +  def RxPacket(self, echo = False) -> (bytes, bool):
 +   """
 +   Receiving packets
 +
 +   Returns
 +   -------
 +   bytes
 +     Packets received
 +   bool
 +     Success or failure
 +   """
 +   self.__serial.flush()
 +   statp = self.__rx(5)
 +   if statp:
 +     if len(statp) == 5:
 +       l = statp[3] - 1
 +       self.__Error = statp[4]
 +       statp += self.__rx(l)
 +       if len(statp) == l + 5:
 +         if statp[-1:][0] == ((~sum(statp[2:-1])) & 0xff):
 +           if echo: print('RX:', statp.hex(':'))
 +           return bytes(statp), (statp[4] & 0x40) == 0
 +   return None, False
 +
 +  @micropython.native
 +  def Write(self, id : int, addr : int, data : bytes, echo = False) -> bool:
 +   """
 +   Write instruction
 +
 +   parameters
 +   -------------
 +   id : int
 +     Target ID
 +   addr : int
 +     Target item address
 +   data : bytes
 +     Data to be written
 +
 +   Returns
 +   -------
 +   result : bool
 +     Success or failure
 +   """
 +   if id >= 0 and id <= self.BROADCASTING_ID and addr >= 0  and addr <= 254:
 +     if self.TxPacket(id, self.INST_WRITE, B2Bs(addr) + data, echo)[1]:
 +       if id != self.BROADCASTING_ID:
 +         dat, r = self.RxPacket(echo)
 +         if r:
 +           return dat[2] == id and (dat[4] & 0x18) == 0
 +       else:
 +         return True
 +   return False
 +
 +  @micropython.native
 +  def Write8(self, id : int, addr : int, data : int | tuple | list, echo = False) -> bool:
 +   return self.Write(id, addr, B2Bs(data), echo)
 +
 +  @micropython.native
 +  def Write16(self, id : int, addr : int, data : int | tuple | list, echo = False) -> bool:
 +   return self.Write(id, addr, W2Bs(data), echo)
 +
 +  @micropython.native
 +  def Write32(self, id : int, addr : int, data : int | tuple | list, echo = False) -> bool:
 +   return self.Write(id, addr, L2Bs(data), echo)
 +
 +  @micropython.native
 +  def Read(self, id : int, addr : int, length : int, echo = False) -> (bytes, bool):
 +   """
 +   Read instruction
 +
 +   parameters
 +   -------------
 +   id : int
 +     Target ID
 +   addr : int
 +     Target item address
 +   length : int
 +     Number of bytes to read
 +
 +   Returns
 +   -------
 +   bytes
 +     Data read
 +   bool
 +     Success or failure
 +   """
 +   if id >= 0 and id <= 253 and addr >= 0 and addr <= 254 and length > 0 and length <= (256 - 6):
 +     if self.TxPacket(id, self.INST_READ, B2Bs((addr, length)), echo)[1]:
 +       dat, r = self.RxPacket(echo)
 +       if r:
 +         if dat[2] == id and (dat[4] & 0x8) == 0:
 +           return bytes(dat[5:-1])
 +   return None
 +
 +  @micropython.native
 +  def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
 +   r = self.Read(id, addr, length, echo)
 +   if r != None:
 +     n = unpack('<' + (('b'*length) if signed else ('B'*length)), r[0:1 * length])
 +     return n if length > 1 else n[0]
 +   return None
 +
 +  @micropython.native
 +  def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
 +   r = self.Read(id, addr, 2 * length, echo)
 +   if r != None:
 +     n = unpack('<' + (('h'*length) if signed else ('H'*length)), r[0:2 * length])
 +     return n if length > 1 else n[0]
 +   return None
 +
 +  @micropython.native
 +  def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
 +   r = self.Read(id, addr, 4 * length, echo)
 +   if r != None:
 +     n = unpack('<' + (('i'*length) if signed else ('I'*length)), r[0:4 * length])
 +     return n if length > 1 else n[0]
 +   return None
 +
 +  @micropython.native
 +  def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool:
 +   """
 +   Sync Write instruction
 +
 +   parameters
 +   -------------
 +   addr : int
 +     Target item address
 +   length : int
 +     Number of bytes to write
 +   id_datas : (TSyncW)
 +     Target ID and data
 +
 +   Returns
 +   -------
 +   bool
 +     Success or failure
 +   """
 +   if addr >= 0 and addr <= 254 and length > 0 and length < (256 - 6):
 +     param = B2Bs((addr,length))
 +     for d in id_datas:
 +       param += B2Bs(d.id) + d.data
 +       if len(d.data) != length or d.id < 0 or d.id > 253:
 +         del param
 +         return False
 +     del param
 +     return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1]
 +   return False
 +
 +  @micropython.native
 +  def Ping(self, id : int, echo = False) -> bool:
 +   if self.TxPacket(id, self.INST_PING, bytes(), echo):
 +     dat, r = self.RxPacket(echo)
 +     if r:
 +       return id == dat[2] and dat[3] == 2
 +   return False
 +
 +  @micropython.native
 +  def FactoryReset(self, id : int, echo = False) -> bool:
 +   if self.TxPacket(id, self.INST_RESET, bytes(), echo):
 +     dat, r = self.RxPacket(echo)
 +     if r:
 +       return id == dat[2] and dat[3] == 2
 +   return False
 +
 +  @micropython.native
 +  def Reboot(self, id : int, echo = False) -> bool:
 +   if self.TxPacket(id, self.INST_REBOOT, bytes(), echo):
 +     dat, r = self.RxPacket(echo)
 +     if r:
 +       return id == dat[2] and dat[3] == 2
 +   return False
 +
 +########################################​##################
 +# API for Dynamixel protocol V2
 +########################################​##################
 +class DXLProtocolV2:
 +  BROADCASTING_ID = const(0xfe)
 +  INST_PING                = const(0x01)
 +  INST_READ                = const(0x02)
 +  INST_WRITE                = const(0x03)
 +  INST_REG_WRITE            = const(0x04)
 +  INST_ACTION              = const(0x05)
 +  INST_FACTORY_RESET        = const(0x06)
 +  INST_REBOOT              = const(0x08)
 +  INST_SYS_WRITE            = const(0x0d)
 +  INST_CLEAR                = const(0x10)
 +  INST_CONTROL_TABLE_BACKUP = const(0x20)
 +  INST_STATUS              = const(0x55)
 +  INST_SYNC_READ            = const(0x82)
 +  INST_SYNC_WRITE          = const(0x83)
 +  INST_SYNG_REG_WRITE      = const(0x85)
 +  INST_FAST_SYNC_READ      = const(0x8a)
 +  INST_BULK_READ            = const(0x92)
 +  INST_BULK_WRITE          = const(0x93)
 +  INST_FAST_BULK_READ      = const(0x9a)
 +
 +  TSyncW = namedtuple("TSyncW", ("id", ("data")))
 +  TBulkW = namedtuple("TBulkW", ("id", "addr", ("data")))
 +  TBulkR = namedtuple("TBulkR", ("id", "addr", "length"))
 +
 +  __crc16_lutable = array.array('H', (
 +   0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011,
 +   0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D, 0x8027, 0x0022,
 +   0x8063, 0x0066, 0x006C, 0x8069, 0x0078, 0x807D, 0x8077, 0x0072,
 +   0x0050, 0x8055, 0x805F, 0x005A, 0x804B, 0x004E, 0x0044, 0x8041,
 +   0x80C3, 0x00C6, 0x00CC, 0x80C9, 0x00D8, 0x80DD, 0x80D7, 0x00D2,
 +   0x00F0, 0x80F5, 0x80FF, 0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1,
 +   0x00A0, 0x80A5, 0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1,
 +   0x8093, 0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087, 0x0082,
 +   0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D, 0x8197, 0x0192,
 +   0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB, 0x01AE, 0x01A4, 0x81A1,
 +   0x01E0, 0x81E5, 0x81EF, 0x01EA, 0x81FB, 0x01FE, 0x01F4, 0x81F1,
 +   0x81D3, 0x01D6, 0x01DC, 0x81D9, 0x01C8, 0x81CD, 0x81C7, 0x01C2,
 +   0x0140, 0x8145, 0x814F, 0x014A, 0x815B, 0x015E, 0x0154, 0x8151,
 +   0x8173, 0x0176, 0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162,
 +   0x8123, 0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137, 0x0132,
 +   0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E, 0x0104, 0x8101,
 +   0x8303, 0x0306, 0x030C, 0x8309, 0x0318, 0x831D, 0x8317, 0x0312,
 +   0x0330, 0x8335, 0x833F, 0x033A, 0x832B, 0x032E, 0x0324, 0x8321,
 +   0x0360, 0x8365, 0x836F, 0x036A, 0x837B, 0x037E, 0x0374, 0x8371,
 +   0x8353, 0x0356, 0x035C, 0x8359, 0x0348, 0x834D, 0x8347, 0x0342,
 +   0x03C0, 0x83C5, 0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1,
 +   0x83F3, 0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7, 0x03E2,
 +   0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD, 0x83B7, 0x03B2,
 +   0x0390, 0x8395, 0x839F, 0x039A, 0x838B, 0x038E, 0x0384, 0x8381,
 +   0x0280, 0x8285, 0x828F, 0x028A, 0x829B, 0x029E, 0x0294, 0x8291,
 +   0x82B3, 0x02B6, 0x02BC, 0x82B9, 0x02A8, 0x82AD, 0x82A7, 0x02A2,
 +   0x82E3, 0x02E6, 0x02EC, 0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2,
 +   0x02D0, 0x82D5, 0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1,
 +   0x8243, 0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257, 0x0252,
 +   0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E, 0x0264, 0x8261,
 +   0x0220, 0x8225, 0x822F, 0x022A, 0x823B, 0x023E, 0x0234, 0x8231,
 +   0x8213, 0x0216, 0x021C, 0x8219, 0x0208, 0x820D, 0x8207, 0x0202
 +  ))
 +
 +  def __init__(self, baudrate = 57600, timeout_ms = 50):
 +   """
 +   Initalize
 +
 +   parameters
 +   -------------
 +   port : int
 +     UART no.
 +   baudrate : int
 +     Serial baudrate[bps]
 +   timeout : float
 +     Read timeout[s]
 +   """
 +   self.__Error = 0
 +   self.__baudrate = baudrate
 +   self.__timeout = timeout_ms
 +   self.__serial = UART(1, baudrate, tx = Pin(4), rx = Pin(5), txbuf = 500, rxbuf = 500, timeout = timeout_ms)
 +
 +  @property
 +  def baudrate(self):
 +   return self.__baudrate
 +
 +  @baudrate.setter
 +  def baudrate(self, baudrate):
 +   self.__baudrate = baudrate
 +   self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout)
 +
 +  @property
 +  def timeout(self):
 +   return self.__timeout
 +
 +  @timeout.setter
 +  def timeout(self, timeout_ms):
 +   self.__timeout = timeout_ms
 +   self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout)
 +
 +  @property
 +  def Error(self):
 +   return self.__Error
 +
 +  @micropython.viper
 +  def __crc16(self, dat : ptr8, l:int) -> int:
 +   crc:int = 0
 +   pt = ptr16(self.__crc16_lutable)
 +   for i in range(l):
 +     crc = (crc << 8) ^ pt[(((crc >> 8) ^ dat[i]) & 0xff)]
 +   return crc & 0xffff
 +
 +  @micropython.native
 +  def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool):
 +   """
 +   Sending packets
 +
 +   parameters
 +   -------------
 +   id : int
 +     Target ID
 +   inst : int
 +     Instruction command
 +   param : bytes
 +     Packet parameters
 +
 +   Returns
 +   -------
 +   bytes
 +     Packets sent
 +   bool
 +     Success or failure
 +   """
 +   if ((id == self.BROADCASTING_ID) or (id >= 0 and id <= 252)) and len(param) < (65536 - 10):
 +     instp = bytearray([0xff,0xff,0xfd,0x00,id,0,0,in​st]) + bytearray(param).replace(b'\xff\xff\xfd'​,b'\xff\xff\xfd\xfd')
 +     instp[5:7] = W2Bs(len(instp) - 5)
 +     instp += W2Bs(self.__crc16(instp, len(instp)))
 +     _ = self.__serial.read(self.__serial.any())
 +     if echo: print('TX:', instp.hex(':'))
 +     self.__serial.write(instp)
 +     return bytes(instp), True
 +   return None, False
 +
 +  @micropython.native
 +  def __rx(self, length) -> bytes:
 +   s = self.__serial.read(length)
 +   if s:
 +     l = len(s)
 +     if l == length:
 +       return s
 +     else:
 +       r = s
 +       length -= l
 +       while self.__serial.any() > 0:
 +         s = self.__serial.read(length)
 +         r += s
 +         length -= len(s)
 +         if length == 0:
 +           break
 +       return r
 +   else:
 +     return bytes()
 +
 +  @micropython.native
 +  def RxPacket(self, echo = False) -> (bytes, bool):
 +   """
 +   Receiving packets
 +
 +   Returns
 +   -------
 +   bytes
 +     Packets received
 +   bool
 +     Success or failure
 +   """
 +   self.__serial.flush()
 +   statp = self.__rx(9)
 +   if statp:
 +     if len(statp) == 9:
 +       if statp[0] == 0xff and statp[1] == 0xff and statp[2] == 0xfd and statp[3] == 0 and statp[7] == 0x55:
 +         l = unpack('<H', statp[5:7])[0] - 2
 +         self.__Error = statp[8]
 +         statp += self.__rx(l)
 +         if len(statp) == l + 9:
 +           if unpack('<H', statp[-2:])[0] == self.__crc16(statp[:-2], len(statp[:-2])):
 +             statp = statp[0:9] + statp[9:].replace(b'\xff\xff\xfd\xfd',b'​\xff\xff\xfd')
 +             if echo: print('RX:', statp.hex(':'))
 +             return bytes(statp), (statp[8] & 0x7f) == 0
 +   return None, False
 +
 +  @micropython.native
 +  def Write(self, id : int, addr : int, data: bytes, echo = False) -> bool:
 +   """
 +   Write instruction
 +
 +   parameters
 +   -------------
 +   id : int
 +     Target ID
 +   addr : int
 +     Target item address
 +   data : bytes
 +     Data to be written
 +
 +   Returns
 +   -------
 +   result : bool
 +     Success or failure
 +   """
 +   if ((id >= 0 and id <= 252) or (id == self.BROADCASTING_ID)) and addr >= 0 and addr <= 65535:
 +     if self.TxPacket(id, self.INST_WRITE, W2Bs(addr) + data, echo)[1]:
 +       if id != self.BROADCASTING_ID:
 +         dat, r = self.RxPacket(echo)
 +         if r:
 +           return dat[4] == id and (dat[8] & 0x7f) == 0
 +         else:
 +           return True
 +   return False
 +
 +  @micropython.native
 +  def Write8(self, id : int, addr : int, data : int | tuple | list, echo = False):
 +   return self.Write(id, addr, B2Bs(data), echo)
 +
 +  @micropython.native
 +  def Write16(self, id : int, addr : int, data : int | tuple | list, echo = False):
 +   return self.Write(id, addr, W2Bs(data), echo)
 +
 +  @micropython.native
 +  def Write32(self, id : int, addr : int, data : int | tuple | list, echo = False):
 +   return self.Write(id, addr, L2Bs(data), echo)
 +
 +  @micropython.native
 +  def Read(self, id : int, addr : int, length : int, echo = False) -> (bytes, bool):
 +   """
 +   Read instruction
 +
 +   parameters
 +   -------------
 +   id : int
 +     Target ID
 +   addr : int
 +     Target item address
 +   length : int
 +     Number of bytes to read
 +
 +   Returns
 +   -------
 +   bytes
 +     Data read
 +   bool
 +     Success or failure
 +   """
 +   if id >= 0 and id <= 252 and addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10):
 +     if self.TxPacket(id, self.INST_READ, W2Bs(addr) + W2Bs(length), echo)[1]:
 +       dat, r = self.RxPacket(echo)
 +       if r:
 +         return bytes(dat[9:-2])
 +   return None
 +
 +  @micropython.native
 +  def Read8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
 +   r = self.Read(id, addr, length, echo)
 +   if r:
 +     n = unpack('<' + (('b'*length) if signed else ('B'*length)), r[0:1 * length])
 +     return n if length > 1 else n[0]
 +   return None
 +
 +  @micropython.native
 +  def Read16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
 +   r = self.Read(id, addr, 2 * length, echo)
 +   if r != None:
 +     n = unpack('<' + (('h'*length) if signed else ('H'*length)), r[0:2 * length])
 +     return n if length > 1 else n[0]
 +   return None
 +
 +  @micropython.native
 +  def Read32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
 +   r = self.Read(id, addr, 4 * length, echo)
 +   if r != None:
 +     n = unpack('<' + (('i'*length) if signed else ('I'*length)), r[0:4 * length])
 +     return n if length > 1 else n[0]
 +   return None
 +
 +  @micropython.native
 +  def SyncWrite(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool:
 +   """
 +   Sync Write instruction
 +
 +   parameters
 +   -------------
 +   addr : int
 +     Target item address
 +   length : int
 +     Number of bytes to write
 +   id_datas : (TSyncW)
 +     Target ID and data
 +
 +   Returns
 +   -------
 +   bool
 +     Success or failure
 +   """
 +   if addr >= 0 and addr <= 65535 and length > 0 and length < (65536 - 10):
 +     param = W2Bs(addr) + W2Bs(length)
 +     for d in id_datas:
 +       param += bytes((d.id,)) + d.data
 +       if len(d.data) != length or d.id < 0 or d.id > 252:
 +         del param
 +         return False
 +     return self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, param, echo)[1]
 +   return False
 +
 +  @micropython.native
 +  def SyncRead(self, addr : int, length : int, ids : (int), echo = False) -> tuple:
 +   """
 +   Sync Read instruction
 +
 +   parameters
 +   -------------
 +   addr : int
 +     Target item address
 +   length : int
 +     Number of bytes to read
 +   ids : (int)
 +     Target IDs
 +
 +   Returns
 +   -------
 +   tuple
 +     Read ID and data
 +   """
 +   result = ()
 +   if addr >= 0 and addr < 65535 and length > 0 and length < (65536 - 10):
 +     if self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_READ, W2Bs(addr) + W2Bs(length) + B2Bs(ids), echo)[1]:
 +       for id in ids:
 +         dat, r = self.RxPacket(echo)
 +         if r:
 +           if dat[4] == id:
 +             result += (id, bytes(dat[9:9 + length])),
 +           else:
 +             result += (id, bytes([])),
 +         else:
 +           result += (id, bytes([])),
 +   return result
 +
 +  @micropython.native
 +  def BulkWrite(self, data : (TBulkW), echo = False) -> bool:
 +   """
 +   Bulk Write instruction
 +
 +   parameters
 +   -------------
 +   addr : int
 +     Target item address
 +   length : int
 +     Number of bytes to write
 +   ids : (TSyncW)
 +     Target ID and data
 +
 +   Returns
 +   -------
 +   bool
 +     Success or failure
 +   """
 +   param = bytes()
 +   for d in data:
 +     if d.id >= 0 and d.id <= 252 and d.addr >= 0 and d.addr <= 65535:
 +       param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(len(d.data)) + d.data
 +     else:
 +       del param
 +       return False
 +   return self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_WRITE, param, echo)[1]
 +
 +  @micropython.native
 +  def BulkRead(self, data:(TBulkR), echo = False) -> tuple:
 +   """
 +   Bulk Read instruction
 +
 +   parameters
 +   -------------
 +   data : (TBulkR)
 +     ID, address, and number of bytes to be read
 +
 +   Returns
 +   -------
 +   tuple
 +     Read ID and data
 +   """
 +   result = ()
 +   param = bytes()
 +   for d in data:
 +     if d.id < 0 or d.addr < 0 or d.length < 0:
 +       return result
 +     param += B2Bs(d.id) + W2Bs(d.addr) + W2Bs(d.length)
 +   if self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_READ, param, echo)[1]:
 +     for d in data:
 +       rxd, r = self.RxPacket(echo)
 +       if r:
 +         if(d.id == rxd[4]):
 +           result += (d.id, bytes(rxd[9:-2])),
 +         else:
 +           result += (d.id, bytes([])),
 +       else:
 +         result += (d.id, bytes([])),
 +   del param
 +   return result
 +
 +  @micropython.native
 +  def Ping(self, id : int, echo = False) -> bool:
 +   if self.TxPacket(id, self.INST_PING, bytes(), echo)[1]:
 +     rxd, r = self.RxPacket(echo)
 +     if r:
 +       return id == rxd[4] and rxd[5] == 7 and rxd[6] == 0
 +   return False
 +
 +  @micropython.native
 +  def FactoryReset(self, id : int, p1 : int, echo = False) -> bool:
 +   if self.TxPacket(id, self.INST_FACTORY_RESET, B2Bs(p1), echo)[1]:
 +     rxd, r = self.RxPacket(echo)
 +     if r:
 +       return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0
 +   return False
 +
 +  @micropython.native
 +  def Reboot(self, id : int, echo = False) -> bool:
 +   if self.TxPacket(id, self.INST_REBOOT, bytes(), echo)[1]:
 +     rxd, r = self.RxPacket(echo)
 +     if r:
 +       return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0
 +   return False
 +
 +  @micropython.native
 +  def Clear(self, id : int, val, echo = False) -> bool:
 +   if self.TxPacket(id, self.INST_CLEAR, B2Bs(1) + L2Bs(val), echo)[1]:
 +     rxd, r = self.RxPacket(echo)
 +     if r:
 +       return id == rxd[4] and rxd[5] == 4 and rxd[6] == 0
 +   return False
 +
 +########################################​##################
 +# test code
 +########################################​##################
 +if __name__ == "__main__":
 +  import uasyncio as asyncio
 +  from ucontextlib import contextmanager
 +  import gc
 +
 +  try:
 +   led = Pin('LED', Pin.OUT)
 +  except TypeError:
 +   led = Pin(25, Pin.OUT)
 +  led.off()
 +
 +  dx = None
 +  ec = False
 +  ID = 1
 +  fin = False
 +
 +  async def test1():
 +   global dx, ec, ID, fin
 +   @contextmanager
 +   def stopwatch():
 +     led.toggle()
 +     start_time = time.ticks_us()
 +     yield
 +     #print('..proc time={:.2f}ms {}'.format((time.ticks_us() - start_time) / 1000, gc.mem_alloc()))
 +
 +   # Basic operation check
 +   try:
 +     '''
 +     with stopwatch():
 +       r = dx.FactoryReset(ID, echo=ec)
 +     print(f'Reset({ID}): {r}')
 +     dx.baudrate = 57600
 +     ID = 1
 +     await asyncio.sleep(0.5)
 +     '''
 +
 +     """ ping dxl """
 +     for i in range(10):
 +       with stopwatch():
 +         print(f'Ping({i}):', dx.Ping(i, echo=ec))
 +
 +     """ reboot dxl """
 +     with stopwatch():
 +       r = dx.Reboot(ID, echo=ec)
 +     print(f'Reboot({ID})= {r}')
 +     await asyncio.sleep(0.5)
 +
 +     """ basic packet proc """
 +     with stopwatch():
 +       r0 = dx.TxPacket(ID, dx.INST_WRITE, (65, 0, 1))
 +       r1 = dx.RxPacket()
 +     print('TxPacket({})= {} {}'.format(ID, r0[1], r0[0].hex(':') if r0[0] else '--'))
 +     print('RxPacket({})= {} {}'.format(ID, r1[1], r1[0].hex(':') if r1[0] else '--'))
 +
 +     """ dump memory """
 +     l = 50
 +     for addr in range(0, 700, l):
 +       r = dx.Read(ID, addr, l, echo = ec)
 +       print(f'Read({addr};{l})=',r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!')
 +
 +     """ read byte item """
 +     with stopwatch():
 +       r = dx.Read8(ID, 65, echo=ec)
 +     print(f'Read8({ID},65)= {r}')
 +     """ read word item """
 +     with stopwatch():
 +       r = dx.Read16(ID, 120, echo=ec)
 +     print(f'Read16({ID},120)= {r}')
 +     """ read long item """
 +     with stopwatch():
 +       r = dx.Read32(ID, 132, echo=ec)
 +     print(f'Read32({ID},132)= {r}')
 +
 +     """ sync read inst. """
 +     with stopwatch():
 +       d = dx.SyncRead(0, 4, (1, 2, 3, 4, 5, 6, 7, 8), echo=ec)
 +     print('SyncRead=')
 +     for d in d:
 +       print(f' ({d[0]},0)', d[1].hex(':') if len(d[1]) > 0 else ())
 +
 +     """ sync write inst. """
 +     for i in range(20):
 +       with stopwatch():
 +         dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(1)), dx.TSyncW(2, B2Bs(1)), dx.TSyncW(3, B2Bs(1))), echo=ec)
 +       await asyncio.sleep(0.05)
 +       with stopwatch():
 +         dx.SyncWrite(65, 1, (dx.TSyncW(1, B2Bs(0)), dx.TSyncW(2, B2Bs(0)), dx.TSyncW(3, B2Bs(0))), echo=ec)
 +       await asyncio.sleep(0.05)
 +
 +     """ set goal position """
 +     #torque off
 +     with stopwatch():
 +       dx.Write8(ID, 64, 0, echo=ec)
 +     #multi turn
 +     with stopwatch():
 +       dx.Write8(ID, 11, 4, echo=ec)
 +     #torque on
 +     with stopwatch():
 +       dx.Write8(ID, 64, 1, echo=ec)
 +     for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)):
 +       #set goal potition
 +       with stopwatch():
 +         dx.Write32(ID, 116, gp, echo=ec)
 +       #get present potition
 +       with stopwatch():
 +         pp = dx.Read32(ID, 132, signed = True, echo=ec)
 +       if pp:
 +         print(f'Goal Pos={gp:6d}, Present Pos={pp:6d}', end='\r')
 +       else:
 +         break
 +       await asyncio.sleep(0.05)
 +     else:
 +       print('')
 +     #torque off
 +     with stopwatch():
 +       dx.Write8(ID, 64, 0, echo=ec)
 +     #normal turn
 +     with stopwatch():
 +       dx.Write8(ID, 11, 3, echo=ec)
 +
 +     """ test add/remove suffixes """
 +     fffffd = bytes((
 +       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
 +       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
 +       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
 +     ))
 +     with stopwatch():
 +       r = dx.Write(ID, 634, fffffd, echo=ec)
 +     print(f'Write({ID},634)= {r}')
 +     with stopwatch():
 +       r = dx.Read(ID, 634, len(fffffd), echo=ec)
 +     print(f'Read({ID},634)=', r.hex(':') if r else '--')
 +
 +     """ bulk read inst. """
 +     with stopwatch():
 +       d = dx.BulkRead((dx.TBulkR(1, 0, 10), dx.TBulkR(2, 0, 20), dx.TBulkR(3, 0, 30), dx.TBulkR(4, 0, 40), dx.TBulkR(6, 0, 50)), echo=ec)
 +     print('BulkRead=')
 +     for d in d:
 +       print(f' ({d[0]},0)', d[1].hex(':'))
 +
 +     """ bulk write inst. """
 +     dx.Write8(ID, 64, 1, echo=ec)
 +     with stopwatch():
 +       dx.BulkWrite((dx.TBulkW(1, 104, L2Bs((0,0,0,1024))), dx.TBulkW(2, 65, B2Bs(0)), dx.TBulkW(3, 65, B2Bs(0))), echo=ec)
 +     await asyncio.sleep(0.5)
 +     with stopwatch():
 +       dx.BulkWrite((dx.TBulkW(1, 104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(2048)), dx.TBulkW(2, 65, B2Bs(1)), dx.TBulkW(3, 65, B2Bs(1))), echo=ec)
 +     await asyncio.sleep(0.5)
 +     with stopwatch():
 +       dx.BulkWrite((dx.TBulkW(1, 104, L2Bs(0)+L2Bs(0)+L2Bs(0)+L2Bs(1024)), dx.TBulkW(2, 65, B2Bs(0)), dx.TBulkW(3, 65, B2Bs(0))), echo=ec)
 +     await asyncio.sleep(0.5)
 +     with stopwatch():
 +       dx.BulkWrite((dx.TBulkW(1, 116, L2Bs(2048)), dx.TBulkW(2,65, B2Bs(1)), dx.TBulkW(3, 65, B2Bs(1))), echo=ec)
 +     await asyncio.sleep(0.5)
 +     with stopwatch():
 +       dx.BulkWrite((dx.TBulkW(1, 65, B2Bs(0)), dx.TBulkW(2, 65, B2Bs(0)), dx.TBulkW(3, 65, B2Bs(0))), echo=ec)
 +     dx.Write8(ID, 64, 0, echo=ec)
 +
 +   except Exception as ex:
 +     print('--- Caught Exception ---')
 +     import sys
 +     sys.print_exception(ex)
 +     print('------------------------')
 +
 +   fin = True
 +
 +  async def test2():
 +   global dx, ec, ID, fin
 +   await asyncio.sleep(0.5)
 +   while not fin:
 +     try:
 +       dx.Write8(ID, 65, dx.Read8(ID, 65, echo=ec) ^ 1, echo=ec)
 +       await asyncio.sleep(0.02)
 +     except Exception as ex:
 +       print('--- Caught Exception ---')
 +       import sys
 +       sys.print_exception(ex)
 +       print('------------------------')
 +
 +  def test3():
 +   global dx, ec, ID, fin
 +
 +   # Basic operation check
 +   print('reset=', dx.Reboot(1, echo=ec))
 +   time.sleep(2)
 +
 +   print('ping=', dx.Ping(1, echo=ec))
 +
 +   dx.TxPacket(1, dx.INST_READ, (0x2b, 0x01), True)
 +   dx.RxPacket(True)
 +
 +   r = dx.Read(1, 0, 70, echo=ec)
 +   print(r.hex(','))
 +   print('len=',len(r))
 +
 +   for i in range(15):
 +     led = dx.Read8(1, 25, echo=ec) ^ 1
 +     dx.Write8(1, 25, led, echo=ec)
 +     print(f'led={led}', end = '\r')
 +     time.sleep(0.2)
 +   print()
 +
 +   dx.Write8(1, 24, 0, echo=ec)
 +   dx.Write16(1, 6, (4095, 4095), echo=ec)
 +
 +   for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)):
 +     dx.Write16(1, 30, gp, echo=ec)
 +     pp = dx.Read16(1, 36, signed=True, echo=ec)
 +     print(f'Goal Pos={gp:6d}, Present Pos={pp:6d}', end='\r')
 +     time.sleep(0.05)
 +   else:
 +     print('')
 +
 +   dx.Write8(1, 24, 0, echo=ec)
 +   dx.Write16(1, 6, (0, 4095),echo=ec)
 +
 +  machine.freq(250000000) # set the CPU frequency to 250 MHz
 +
 +  dx = DXLProtocolV2(baudrate = 3000000)
 +  loop = asyncio.get_event_loop()
 +  loop.create_task(test1())
 +  loop.create_task(test2())
 +  loop.run_forever()
 +
 +  time.sleep(1)
 +  dx = DXLProtocolV1(baudrate = 57600)
 +  test3()
</pre> </pre>
}} }}


Front page   Diff ReloadPrint View   Page list Search Recent changes   RSS of recent changes (RSS 1.0) RSS of recent changes (RSS 2.0) RSS of recent changes (RSS Atom)