13: 2023-09-07 (木) 18:49:45 takaboo ソース 14: 2024-01-08 (月) 13:54:25 takaboo ソース
Line 992: Line 992:
***他の言語 [#d5eca280] ***他の言語 [#d5eca280]
 +****Pythonその1 [#t51150f3]
[[DX2LIBのページ>DX2LIB#u3ff03d9]]に紹介されている通りだが、その中でも流行の[[Python>https://www.python.jp/]]で遊んでみても面白いかと思う。~ [[DX2LIBのページ>DX2LIB#u3ff03d9]]に紹介されている通りだが、その中でも流行の[[Python>https://www.python.jp/]]で遊んでみても面白いかと思う。~
GCC Developer LiteにはPythonが含まれており、コンパイルオプションで「Python(32bit)」や「Python(64bit)」を選ぶ事でライブラリに同梱されるPythonのサンプルソースコードが利用できる。しかし追加APIを利用したものばかりで、コントロールテーブルを読み書きする基本的なAPIは使われていない。 GCC Developer LiteにはPythonが含まれており、コンパイルオプションで「Python(32bit)」や「Python(64bit)」を選ぶ事でライブラリに同梱されるPythonのサンプルソースコードが利用できる。しかし追加APIを利用したものばかりで、コントロールテーブルを読み書きする基本的なAPIは使われていない。
Line 1190: Line 1191:
}} }}
追加APIに無い機能は基本APIを使ってコントロールテーブルへ直接アクセスするしかないため、参考にしてもらえればと思う。 追加APIに無い機能は基本APIを使ってコントロールテーブルへ直接アクセスするしかないため、参考にしてもらえればと思う。
 +
 +****Pythonその2 [#k91ab321]
 +全てPythonで書いてしまえば、Cで作られたライブラリをリンクしたり、ctypesの有象無象に呵まれる事も無くスッキリする。~
 +突如気が向いたので、インストラクションパケットの生成と送信、ステータスパケットの受信、インストラクションの種類によってパラメータ部分を結合・分解するクラスを作ってみた。前記の内容とは一線を画すので、互換性は一切考えていない。
 +[[pySerial>https://github.com/pyserial/p​yserial]]は必須。~
 +単体で一通りのAPIを試せるようにデモコードを付けておいた。ターゲットはXM/XH/XDシリーズ、IDは1~4(少なくともID=1は必須)、ボーレートは3Mbpsを想定、デバッグ用にpsutilが必要といったところだ。
 +#html{{
 +<pre class="brush: python;"  title="dx2lib.py">
 +#!/usr/bin/python3
 +# -*- coding: utf-8 -*-
 +#
 +# dx2lib.py
 +# (C) 2024 BestTechnology
 +
 +import serial, threading, array, struct
 +from collections import namedtuple
 +from struct import pack, unpack
 +
 +class dx2lib:
 +  BROADCASTING_ID = 0xfe
 +  INST_PING                = 0x01
 +  INST_READ                = 0x02
 +  INST_WRITE                = 0x03
 +  INST_REG_WRITE            = 0x04
 +  INST_ACTION              = 0x05
 +  INST_RESET                = 0x06
 +  INST_REBOOT              = 0x08
 +  INST_SYS_WRITE            = 0x0d
 +  INST_CLEAR                = 0x10
 +  INST_CONTROL_TABLE_BACKUP = 0x20
 +  INST_STATUS              = 0x55
 +  INST_SYNC_READ            = 0x82
 +  INST_SYNC_WRITE          = 0x83
 +  INST_SYNG_REG_WRITE      = 0x85
 +  INST_FAST_SYNC_READ      = 0x8a
 +  INST_BULK_READ            = 0x92
 +  INST_BULK_WRITE          = 0x93
 +  INST_FAST_BULK_READ      = 0x9a
 +
 +  TSyncW = namedtuple("TSyncW", ("id", ("data")))
 +  TBulkW = namedtuple("TBulkW", ("id", "addr", ("data")))
 +  TBulkR = namedtuple("TBulkR", ("id", "addr", "length"))
 +
 +  __crc16_lutable = array.array('H',[])
 +
 +  def __init__(self, port : str, baudrate : int = 57600, timeout : float  = 0.05):
 +   """
 +   Initalize
 +
 +   parameters
 +   -------------
 +   port : str
 +     Device name
 +   baudrate : int
 +     Serial baudrate[bps]
 +   timeout : float
 +     Read timeout[s]
 +   """
 +   self.__serial = serial.Serial(port, baudrate, timeout = timeout)
 +   self.__baudrate = baudrate
 +   self.__timeout = timeout;
 +   self.__mutex = threading.Lock()
 +   for i in range(256):
 +     nData = i << 8
 +     nAccum = 0
 +     poly = 0x8005
 +     for j in range(8):
 +       if ((nData ^ nAccum) & 0x8000):
 +         nAccum = (nAccum << 1) ^ poly
 +       else:
 +         nAccum <<= 1
 +       nData <<= 1
 +     self.__crc16_lutable.append(nAccum & 0xffff)
 +
 +  @property
 +  def mutex(self):
 +   return self.__mutex
 +
 +  @property
 +  def baudrate(self):
 +   return self.__ser.baudrate
 +
 +  @baudrate.setter
 +  def baudrate(self, baudrate):
 +   self.__baudrate = baudrate
 +   self.__serial.baudrate = baudrate
 +
 +  @property
 +  def timeout(self):
 +   return self.__serial.timeout
 +
 +  @timeout.setter
 +  def timeout(self, timeout):
 +   self.__timeout = timeout
 +   self.__serial.timeout = timeout
 +
 +  def __crc16(self, data : bytes) -> int:
 +   crc = 0
 +   for d in data:
 +     crc = (crc << 8) ^ self.__crc16_lutable[(((crc >> 8) ^ d) & 0xff)];
 +   return crc & 0xffff
 +
 +  def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool):
 +   """
 +   Sending packets
 +
 +   parameters
 +   -------------
 +   id : int
 +     Target ID
 +   inst : int
 +     Instruction command
 +   param : bytearray
 +     Packet parameters
 +
 +   Returns
 +   -------
 +   bytes | None
 +     Packets sent
 +   bool
 +     Success or failure
 +   """
 +   if (id == self.BROADCASTING_ID) or (id <= 252):
 +     instp = bytearray([0xff,0xff,0xfd,0x00,id,0,0,in​st]) + bytearray(param).replace(b'\xff\xff\xfd'​,b'\xff\xff\xfd\xfd')
 +     instp[5:7] = bytes(tuple(pack('<H', len(instp) - 5)))
 +     instp += bytes(tuple(pack('<H', self.__crc16(instp))))
 +     self.__serial.reset_input_buffer()
 +     if echo: print('TX:', instp.hex(','))
 +     self.__serial.write(instp)
 +     return bytes(instp), True
 +   return None, False
 +
 +  def RxPacket(self, echo = False) -> (bytes, bool):
 +   """
 +   Receiving packets
 +
 +   Returns
 +   -------
 +   bytes | None
 +     Packets received
 +   bool
 +     Success or failure
 +   """
 +   statp = self.__serial.read(9)
 +   if statp:
 +     if len(statp) == 9:
 +       l = unpack('<H', statp[5:7])[0] - 2
 +       statp += self.__serial.read(l)
 +       if len(statp) == l + 9:
 +         if unpack('<H', statp[-2:])[0] == self.__crc16(statp[:-2]):
 +           statp = statp[0:9] + statp[9:].replace(b'\xff\xff\xfd\xfd',b'​\xff\xff\xfd')
 +           if echo: print('RX:', statp.hex(','))
 +           return bytes(statp), True
 +   return None, False
 +
 +  def WriteBlockData(self, id : int, addr : int, data : bytes, echo = False) -> bool:
 +   """
 +   Write instruction
 +
 +   parameters
 +   -------------
 +   id : int
 +     Target ID
 +   addr : int
 +     Target item address
 +   data : bytes
 +     Data to be written
 +
 +   Returns
 +   -------
 +   result : bool
 +     Success or failure
 +   """
 +   with self.__mutex:
 +     if (id <= 252) or (id == self.BROADCASTING_ID):
 +       _, r = self.TxPacket(id, self.INST_WRITE, memoryview(pack('<H', addr) + data), echo)
 +       if r:
 +         d, r = self.RxPacket(echo)
 +         return r and (d[4] == id)
 +     return False
 +
 +  def WriteByteData(self, id : int, addr : int, data : int, echo = False) -> bool:
 +   return self.WriteBlockData(id, addr, bytes((data,)), echo)
 +
 +  def WriteWordData(self, id : int, addr : int, data : int, echo = False) -> bool:
 +   return self.WriteBlockData(id, addr, memoryview(bytes(tuple(pack('<H', data)))), echo)
 +
 +  def WriteLongData(self, id : int, addr : int, data : int, echo = False) -> bool:
 +   return self.WriteBlockData(id, addr, memoryview(bytes(tuple(pack('<l', data)))), echo)
 +
 +  def ReadBlockData(self, id : int, addr : int, length : int, echo = False) -> (bytes, bool):
 +   """
 +   Read instruction
 +
 +   parameters
 +   -------------
 +   id : int
 +     Target ID
 +   addr : int
 +     Target item address
 +   length : int
 +     Number of bytes to read
 +
 +   Returns
 +   -------
 +   bytes | None
 +     Data read
 +   bool
 +     Success or failure
 +   """
 +   with self.__mutex:
 +     if (id <= 252) and (length > 0):
 +       _, r = self.TxPacket(id, self.INST_READ, memoryview(bytes(list(pack('<H', addr))) + bytes(list(pack('<H', length)))), echo)
 +       if r:
 +         dat, r = self.RxPacket(echo)
 +         if r:
 +           return bytes(dat[9:-2]), (id == dat[4])
 +     return None, False
 +
 +  def ReadByteData(self, id : int, addr : int, echo = False) -> int:
 +   dat, r = self.ReadBlockData(id, addr, 1, echo)
 +   if r:
 +     return int(dat[0])
 +   return None
 +
 +  def ReadWordData(self, id : int, addr : int, echo = False) -> int:
 +   dat, r = self.ReadBlockData(id, addr, 2, echo)
 +   if r:
 +     return unpack('<H', dat[0:2])[0]
 +   return None
 +
 +  def ReadLongData(self, id : int, addr : int, echo = False) -> int:
 +   dat, r = self.ReadBlockData(id, addr, 4, echo)
 +   if r:
 +     return unpack('<l', dat[0:4])[0]
 +   return None
 +
 +  def WriteSyncData(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool:
 +   with self.__mutex:
 +     param = bytes(pack('<H', addr)) + bytes(pack('<H', length))
 +     for d in id_datas:
 +       param += bytes((d.id,)) + d.data
 +     _, r = self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, memoryview(param), echo)
 +     del param
 +     return r
 +
 +  def ReadSyncData(self, addr : int, length : int, ids : (int), echo = False) -> tuple:
 +   """
 +   Sync Read instruction
 +
 +   parameters
 +   -------------
 +   addr : int
 +     Target item address
 +   length : int
 +     Number of bytes to read
 +   ids : Tuple[int]
 +     Data to be written
 +
 +   Returns
 +   -------
 +   bytes | None
 +     Data read
 +   bool
 +     Success or failure
 +   """
 +   with self.__mutex:
 +     result = ()
 +     _, r = self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_READ, memoryview(bytes(pack('<H', addr)) + bytes(pack('<H', length)) + bytes(ids)), echo)
 +     if r:
 +       for id in ids:
 +         dat, r = self.RxPacket(echo)
 +         if r:
 +           result += (id, bytes(dat[9:9 + length])),
 +         else:
 +           result += (id, bytes([])),
 +     return result
 +   return None
 +
 +  def WriteBulkData(self, data : (TBulkW), echo = False) -> bool:
 +   with self.__mutex:
 +     param = bytes()
 +     for d in data:
 +       param += pack('B', d.id) + pack('<H', d.addr) + pack('<H', len(d.data)) + d.data
 +     _, r = self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_WRITE, memoryview(param), echo)
 +     del param
 +     return r
 +
 +  def ReadBulkData(self, data:(TBulkR), echo = False) -> tuple:
 +   with self.__mutex:
 +     result = ()
 +     param = bytes()
 +     for d in data:
 +       param += bytes(pack('B', d.id)) + bytes(pack('<H', d.addr)) + bytes(pack('<H', d.length))
 +     _, r = self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_READ, memoryview(param), echo)
 +     del param
 +     if r:
 +       for d in data:
 +         rxd, r = self.RxPacket(echo)
 +         if r:
 +           if(d.id == rxd[4]):
 +             result += (d.id, bytes(rxd[9:-2])),
 +           else:
 +             result += (d.id, bytes([])),
 +         else:
 +           result += (d.id, bytes([])),
 +     return result
 +
 +  def Ping(self, id : int, echo = False) -> bool:
 +   with self.__mutex:
 +     _, r = self.TxPacket(id, self.INST_PING, bytes(), echo)
 +     if r:
 +       rxd, r = self.RxPacket(echo)
 +       return r and (id == rxd[4])
 +     return False
 +
 +  def Reset(self, id : int, echo = False) -> bool:
 +   with self.__mutex:
 +     _, r = self.TxPacket(id, self.INST_RESET, bytes(0xff), echo)
 +     if r:
 +       dat, r = self.RxPacket(echo)
 +       return r and (id == dat[4]) and ((dat[8] & 0x7f) == 0)
 +     return False
 +
 +  def Reboot(self, id : int, echo = False) -> bool:
 +   with self.__mutex:
 +     _, r = self.TxPacket(id, self.INST_REBOOT, bytes(), echo)
 +     if r:
 +       dat, r = self.RxPacket(echo)
 +       return r and (id == dat[4]) and ((dat[8] & 0x7f) == 0)
 +     return False
 +
 +  def B2B(self, d):
 +   if isinstance(d, list) or isinstance(d, tuple):
 +     r = ()
 +     for d in d:
 +       r += tuple(pack('<B', d))
 +     return bytes(r)
 +   else:
 +     return bytes(pack('<B', d))
 +
 +  def W2B(self, d):
 +   if isinstance(d, list) or isinstance(d, tuple):
 +     r = ()
 +     for d in d:
 +       r += tuple(pack('<H', d))
 +     return bytes(r)
 +   else:
 +     return bytes(pack('<H', d))
 +
 +  def L2B(self, d):
 +   if isinstance(d, list) or isinstance(d, tuple):
 +     r = ()
 +     for d in d:
 +       r += tuple(pack('<I', d))
 +     return bytes(r)
 +   else:
 +     return bytes(pack('<I', d))
 +
 +if __name__ == "__main__":
 +  from contextlib import contextmanager
 +  from threading import Thread
 +  import time, gc, psutil, os
 +
 +
 +  #dx = dx2lib('/dev/ttySC2', 1000000)
 +  dx = dx2lib('\\\\.\\COM12', 3000000)
 +  echo = False
 +  ID = 1
 +  fin = False
 +
 +  @contextmanager
 +  def stopwatch():
 +   start_time = time.time()
 +   yield
 +   print('..proc time={:.1f}ms {}'.format((time.time() - start_time)*1000, psutil.Process(os.getpid()).memory_info(​).rss))
 +
 +  def func1():
 +   global dx, ID, fin, echo
 +
 +   try:
 +     '''
 +     """ reset dxl """
 +     with stopwatch():
 +       r = dx.Reset(ID, echo)
 +     print(f'Reset({ID}): {r}')
 +     time.sleep(0.5)
 +     '''
 +
 +     """ ping dxl """
 +     print(f'Ping({ID}):', dx.Ping(ID, echo))
 +
 +     """ reboot dxl """
 +     with stopwatch():
 +       r = dx.Reboot(ID, echo)
 +     print(f'Reboot({ID}): {r}')
 +     time.sleep(0.5)
 +
 +     """ basic packet proc """
 +     with stopwatch():
 +       with dx.mutex:
 +         r0 = dx.TxPacket(ID, dx.INST_WRITE, (65, 0, 1))
 +         r1 = dx.RxPacket()
 +     print(f'TxPacket({ID}): {r0[1]}', r0[0].hex(','))
 +     print(f'RxPacket({ID}): {r1[1]}', r1[0].hex(','))
 +
 +     """ read byte item """
 +     with stopwatch():
 +       r = dx.ReadByteData(ID, 65, echo)
 +     print(f'ReadByteData({ID}): {r}')
 +
 +     """ sync read inst. """
 +     print('ReadSyncData:')
 +     d = dx.ReadSyncData(0, 4, (1, 2, 3), True)
 +     for d in d:
 +       print(f' ({d[0]})', d[1].hex(','))
 +
 +     """ sync write inst. """
 +     time.sleep(2)
 +     for i in range(20):
 +       with stopwatch():
 +         dx.WriteSyncData(65, 1, (dx.TSyncW(1,dx.B2B(1)),dx.TSyncW(2,dx.B​2B(1)),dx.TSyncW(3,dx.B2B(1))), echo)
 +       time.sleep(0.05)
 +       with stopwatch():
 +         dx.WriteSyncData(65, 1, (dx.TSyncW(1,dx.B2B(0)),dx.TSyncW(2,dx.B​2B(0)),dx.TSyncW(3,dx.B2B(0))), echo)
 +       time.sleep(0.05)
 +
 +     """ set goal position """
 +     #torque off
 +     if dx.WriteByteData(ID, 64, 0, echo):
 +       #multi turn
 +       if dx.WriteByteData(ID, 11, 4, echo):
 +         #torque on
 +         if dx.WriteByteData(ID, 64, 1, echo):
 +           #set goal position
 +           for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)):
 +             if dx.WriteLongData(ID, 116, gp, echo):
 +               #get present position
 +               with stopwatch():
 +                 pp = unpack('>l', pack('>l', dx.ReadLongData(ID, 132, echo)))[0]
 +               print(f'{gp:6} {pp:6}')
 +               time.sleep(0.05)
 +             else:
 +               break
 +           #torque off
 +           dx.WriteByteData(ID, 64, 0, echo)
 +           #normal turn
 +           dx.WriteByteData(ID, 11, 3, echo)
 +           print('')
 +
 +     """ test add/remove suffixes """
 +     fffffd = array.array('B',(
 +       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
 +       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
 +       0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd,
 +     ))
 +     with stopwatch():
 +       r = dx.WriteBlockData(ID, 634, fffffd, echo)
 +     print(f'WriteBlockData({ID}): {r}')
 +     with stopwatch():
 +       r = dx.ReadBlockData(ID, 634, len(fffffd), echo)
 +     print(f'ReadBlockData({ID}): {r[1]}', r[0].hex(','))
 +
 +     """ bulk read inst. """
 +     with stopwatch():
 +       d = dx.ReadBulkData((dx.TBulkR(1,0,10), dx.TBulkR(2,0,20), dx.TBulkR(3,0,30), dx.TBulkR(4,0,40)), echo)
 +     for d in d:
 +       print(f' ({d[0]},0)', d[1].hex(','))
 +
 +     """ bulk write inst. """
 +     dx.WriteByteData(ID, 64, 1, echo)
 +     with stopwatch():
 +       dx.WriteBulkData((dx.TBulkW(1,104, dx.L2B((0,0,0,1024))), dx.TBulkW(2,65,dx.B2B(0)), dx.TBulkW(3,65,dx.B2B(0))), echo)
 +     time.sleep(0.5)
 +     with stopwatch():
 +       dx.WriteBulkData((dx.TBulkW(1,104, dx.L2B(0)+dx.L2B(0)+dx.L2B(0)+dx.L2B(204​8)), dx.TBulkW(2,65,dx.B2B(1)), dx.TBulkW(3,65,dx.B2B(1))), echo)
 +     time.sleep(0.5)
 +     with stopwatch():
 +       dx.WriteBulkData((dx.TBulkW(1,104, dx.L2B(0)+dx.L2B(0)+dx.L2B(0)+dx.L2B(102​4)), dx.TBulkW(2,65,dx.B2B(0)), dx.TBulkW(3,65,dx.B2B(0))), echo)
 +     time.sleep(0.5)
 +     with stopwatch():
 +       dx.WriteBulkData((dx.TBulkW(1,116, dx.L2B(2048)), dx.TBulkW(2,65,dx.B2B(1)), dx.TBulkW(3,65,dx.B2B(1))), echo)
 +     time.sleep(0.5)
 +     with stopwatch():
 +       dx.WriteBulkData((dx.TBulkW(1,65,bytes(0​)),dx.TBulkW(2,65,bytes(0)),dx.TBulkW(3,​65,bytes(0))), echo)
 +     dx.WriteByteData(ID, 64, 0, echo)
 +
 +   except Exception as ex:
 +     import traceback
 +     print('--- Caught Exception ---')
 +     traceback.print_exc()
 +     print('------------------------')
 +
 +   fin = True
 +
 +  def func2():
 +   global dx, ID, fin, echo
 +
 +   time.sleep(1)
 +   while not fin:
 +     try:
 +       dx.WriteByteData(ID, 65, dx.ReadByteData(ID, 65, echo) ^ 1, echo)
 +       time.sleep(0.05)
 +     except Exception as ex:
 +       import traceback
 +       print('--- Caught Exception ---')
 +       traceback.print_exc()
 +       print('------------------------')
 +       return
 +
 +  th1 = Thread(target=func1)
 +  th2 = Thread(target=func2)
 +  th1.start()
 +  th2.start()
 +  th1.join()
 +  th2.join()
 +  print('fin')
 +</pre>
 +}}


トップ   差分 リロード印刷に適した表示   全ページ一覧 単語検索 最新ページの一覧   最新ページのRSS 1.0 最新ページのRSS 2.0 最新ページのRSS Atom