現: 2024-04-22 (月) 14:34:47 takaboo ソース
Line 1: Line 1:
 +TITLE:KO B3M Python Library
 +**B3Mシリーズ [#wb9f3fea]
 +[[B3Mシリーズ:https://kondo-robot.com/product-categor​y/servomotor/b3m]]~
 +[[B3Mソフトウェアマニュアル>https://kondo-robot.com/faq/b3m_softwar​emanual1200]]
 +**テストコード [#xee3e202]
 +Python前提。~
 +B3Mの挙動の再現性が乏しくなることが頻出するので、正直なところ参考程度に考えておいてほしい。
 +***Python[#mdb668e2]
 +B3Mはパケット構成が緩いので、ファームウェアが想定していない条件で通信を行うと重篤な事態に陥りやすい。~
 +WRITEとPOSITIONのみマルチに対応したコマンドとしてWRITEsとPOSITIONsを用意。~
 +ERROR STATUSをどのように成否判定に使うべきかが判然としないので無視。
 +
 +#html{{
 +<style type="text/css">
 +    .syntaxhighlighter {
 +     overflow-y: auto !important;
 +     overflow-x: auto !important;
 +     max-height: 900px;
 +     -webkit-text-size-adjust: 100%;
 +    }
 +</style>
 +<pre class="brush: python;"  title="pyB3M.py">
 +#!/usr/bin/python3
 +# -*- coding: utf-8 -*-
 +#
 +# pyB3M.py
 +# SPDX-License-Identifier: MIT
 +# SPDX-FileCopyrightText: (C) 2024 mukyokyo
 +
 +import serial, threading, array, struct
 +from typing import Union
 +from collections import namedtuple
 +from struct import pack, unpack, iter_unpack
 +from typing import overload
 +
 +########################################​##################
 +# Functionalized the part of converting int to bytes.
 +# If specified as a tuple, it is converted to bytes at once.
 +########################################​##################
 +def B2Bs(d) -> bytes:
 +  if isinstance(d, list) or isinstance(d, tuple):
 +   return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d)
 +  else:
 +   return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d))
 +
 +def W2Bs(d) -> bytes:
 +  if isinstance(d, list) or isinstance(d, tuple):
 +   return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]])
 +  else:
 +   return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d))
 +
 +def L2Bs(d) -> bytes:
 +  if isinstance(d, list) or isinstance(d, tuple):
 +   return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]])
 +  else:
 +   return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d))
 +
 +########################################​##################
 +# API for Kondo B3M
 +########################################​##################
 +class B3MProtocol:
 +  BROADCASTING_ID  = 0xff
 +
 +  OPT_LOAD    = 1
 +  OPT_SAVE    = 2
 +  OPT_READ    = 3
 +  OPT_WRITE    = 4
 +  OPT_RESET    = 5
 +  OPT_POSITION = 6
 +
 +  (SYSW_BAUD_57600, SYSW_BAUD_115_2k, SYSW_BAUD_625k, SYSW_BAUD_1M, SYSW_BAUD_1_25M, SYSW_BAUD_1_5M, SYSW_BAUD_2M, SYSW_BAUD_3M) = range(8)
 +  (SYSW_PARITY_NONE, SYSW_PARITY_ODD, SYSW_PARITY_EVEN) = range(3)
 +  (MOTW_OPT_NONE, MOTW_OPT_TORQUEON, MOTW_OPT_FREE, _, MOTW_OPT_BRAKE, _, _, _, MOTW_OPT_HOLD) = range(9)
 +
 +  def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None):
 +   if isinstance(port, serial.Serial):
 +     self.__serial = port
 +     self.__baudrate = port.baudrate
 +     self.__timeout = port.timeout
 +   else:
 +     self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout)
 +     self.__baudrate = self.__serial.baudrate
 +     self.__timeout = self.__serial.timeout
 +
 +   if lock == None:
 +     self.__lock = threading.Lock()
 +   else:
 +     self.__lock = lock
 +   self.__status = 0
 +
 +  def __del__(self):
 +   pass
 +
 +  @property
 +  def lock(self):
 +   return self.__lock
 +
 +  @property
 +  def baudrate(self):
 +   return self.__serial.baudrate
 +
 +  @baudrate.setter
 +  def baudrate(self, baudrate):
 +   self.__baudrate = baudrate
 +   self.__serial.baudrate = baudrate
 +
 +  @property
 +  def timeout(self):
 +   return self.__serial.timeout
 +
 +  @timeout.setter
 +  def timeout(self, timeout):
 +   self.__timeout = timeout
 +   self.__serial.timeout = timeout
 +
 +  def __reconfig(self):
 +   self.__serial.baudrate = self.__baudrate
 +   self.__serial.timeout = self.__timeout
 +
 +  @property
 +  def status(self):
 +   return self.__status
 +
 +  def TxPacket(self, cmd : int, opt : int, param : bytes, echo = False) -> (bytes, bool):
 +   self.__reconfig()
 +   if cmd >= 0 and cmd <= 0xff and opt >= 0 and opt <= 0xff and len(param) <= (256 - 5):
 +     txp = bytes((len(param) + 4, cmd, opt)) + bytes(param)
 +     txp += B2Bs(sum(txp) & 0xff)
 +     self.__serial.reset_input_buffer()
 +     if echo: print('TX:', txp.hex(':'))
 +     self.__serial.write(txp)
 +     return txp, True
 +   return None, False
 +
 +  def __rx(self, length) -> bytes:
 +   s = self.__serial.read(length)
 +   l = len(s)
 +   if l == length:
 +     return s
 +   else:
 +     r = s
 +     length -= l
 +     while self.__serial.in_waiting > 0:
 +       s = self.__serial.read(length)
 +       r += s
 +       length -= len(s)
 +       if length <= 0:
 +         break
 +     return r
 +
 +  def RxPacket(self, echo = False) -> (bytes, bool):
 +   rxp = self.__rx(1)
 +   if rxp:
 +     if len(rxp) == 1:
 +       if rxp[0] >= 5:
 +         l = rxp[0] - 1
 +         rxp += self.__rx(l)
 +         if rxp[1] >= 0x81 and rxp[1] <= 0x86 and rxp[3] <= 0xfe and rxp[-1] == (sum(rxp[:-1]) & 0xff):
 +           self.__status = rxp[2]
 +           if echo: print('RX:', rxp.hex(':'))
 +           return bytes(rxp), True
 +   if echo: print('RX;', rxp.hex(';'), ' xxx')
 +   return None, False
 +
 +  def WRITE(self, id : int, addr : int, data : bytes, reset = False, echo = False) -> bool:
 +   with self.__lock:
 +     if id >= 0 and id <= 255 and addr >= 0 and addr <= 0xff and len(data) <= 256 - 7:
 +       if self.TxPacket(self.OPT_WRITE, 0x80 if reset else 0x00, B2Bs(id) + data + B2Bs(addr) + B2Bs(1), echo)[1]:
 +         if id != self.BROADCASTING_ID:
 +           d, r = self.RxPacket(echo)
 +           if r:
 +             return (d[3] == id) and (d[1] == (self.OPT_WRITE | 0x80))
 +         else:
 +           return True
 +     return False
 +
 +  def WRITEs(self, addr : int, iddatas : Union[list, tuple], reset = False, echo = False) -> bool:
 +   with self.__lock:
 +     if addr >= 0 and addr <= 0xff and (isinstance(iddatas, list) or isinstance(iddatas, tuple)):
 +       cnt = len(iddatas)
 +       if cnt > 0:
 +         txd = bytes(0)
 +         l = len(iddatas[0])
 +         for iddata in iddatas:
 +           if iddata[0] < 0 or iddata[0] > 254 or len(iddata) != l:
 +             return False
 +           txd += iddata
 +         txd +=  B2Bs(addr) + B2Bs(cnt)
 +         return self.TxPacket(self.OPT_WRITE, 0x80 if reset else 0x00, txd, echo)[1]
 +     return False
 +
 +  def WRITE8(self, id : int, addr : int, data : Union[int, list, tuple], reset = False, echo = False) -> bool:
 +   return self.WRITE(id, addr, B2Bs(data), reset, echo)
 +
 +  def WRITE16(self, id : int, addr : int, data : Union[int, list, tuple], reset = False, echo = False) -> bool:
 +   return self.WRITE(id, addr, W2Bs(data), reset, echo)
 +
 +  def WRITE32(self, id : int, addr : int, data : Union[int, list, tuple], reset = False, echo = False) -> bool:
 +   return self.WRITE(id, addr, L2Bs(data), reset, echo)
 +
 +  def READ(self, id : int, addr : int, length : int, reset = False, echo = False) -> bytes:
 +   with self.__lock:
 +     if addr >= 0 and id <= 254 and addr >= 0 and addr <= 0xff and length >= 1 and length <= 250:
 +       if self.TxPacket(self.OPT_READ, 0x80 if reset else 0x00, B2Bs(id) + B2Bs(addr) + B2Bs(length), echo)[1]:
 +         d, r = self.RxPacket(echo)
 +         if r:
 +           if d[1] == (self.OPT_READ | 0x80) and id == d[3] and len(d) == length + 5:
 +             return bytes(d[4:-1])
 +     return None
 +
 +  def READ8(self, id : int, addr : int, length = 1, signed = False, reset = False, echo = False) -> int:
 +   r = self.READ(id, addr, length, reset, echo)
 +   if r != None:
 +     n = sum(iter_unpack('b' if signed else 'B', r), ())
 +     return n if length > 1 else n[0]
 +   return None
 +
 +  def READ16(self, id : int, addr : int, length = 1, signed = False, reset = False, echo = False) -> int:
 +   r = self.READ(id, addr, 2 * length, reset, echo)
 +   if r != None:
 +     n = sum(iter_unpack('h' if signed else 'H', r), ())
 +     return n if length > 1 else n[0]
 +   return None
 +
 +  def READ32(self, id : int, addr : int, length = 1, signed = False, reset = False, echo = False) -> int:
 +   r = self.READ(id, addr, 4 * length, reset, echo)
 +   if r != None:
 +     n = sum(iter_unpack('i' if signed else 'I', r), ())
 +     return n if length > 1 else n[0]
 +   return None
 +
 +  def LOAD(self, id : int, echo = False) -> bool:
 +   with self.__lock:
 +     if self.TxPacket(self.OPT_LOAD, 0x0, B2Bs(id), echo)[1]:
 +       if id != self.BROADCASTING_ID:
 +         d, r = self.RxPacket(echo)
 +         if r:
 +           return id == d[3] and d[1] == (self.OPT_LOAD | 0x80) and len(d) == 5
 +       else:
 +         return True;
 +   return False
 +
 +  def SAVE(self, id : int, echo = False) -> bool:
 +   with self.__lock:
 +     if self.TxPacket(self.OPT_SAVE, 0x0, B2Bs(id), echo)[1]:
 +       if id != self.BROADCASTING_ID:
 +         d, r = self.RxPacket(echo)
 +         if r:
 +           return id == d[3] and d[1] == (self.OPT_SAVE| 0x80) and len(d) == 5
 +       else:
 +         return True;
 +     return False
 +
 +  def POSITION(self, id : int, pos : int, tim : int, reset = False, echo = False) -> int:
 +   with self.__lock:
 +     if pos >= -32000 and pos <= 32000 and tim >= 0 and tim <= 65535:
 +       if self.TxPacket(self.OPT_POSITION, 0x80 if reset else 0x00, B2Bs(id) + W2Bs(pos) + W2Bs(tim), echo):
 +         if id != self.BROADCASTING_ID:
 +           dat, r = self.RxPacket(echo)
 +           if r:
 +             if dat[3] == id and len(dat) == 7:
 +               return tuple(n[0] for n in iter_unpack('<h', dat[4:6]))[0]
 +           else:
 +             return None
 +         else:
 +           return None
 +     return None
 +
 +  def POSITIONs(self, idposs : Union[list, tuple], tim : int, reset = False, echo = False) -> bool:
 +   with self.__lock:
 +     if tim >= 0 and tim <= 65535:
 +       cnt = len(idposs)
 +       if cnt > 0:
 +         txd = bytes(0)
 +         l = len(idposs[0])
 +         for idpos in idposs:
 +           if len(idpos) != 2:
 +             return False
 +           if idpos[0] < 0 or idpos[0] > 254 or len(idpos) != l or idpos[1] < -32000 or idpos[1] > 32000:
 +             return False
 +           txd += B2Bs(idpos[0]) + W2Bs(idpos[1])
 +         txd += W2Bs(tim)
 +         return self.TxPacket(self.OPT_POSITION, 0x80 if reset else 0x00, txd, echo)[1]
 +     return False
 +
 +  def RESET(self, id : int, tim : int, echo = False) -> bool:
 +   with self.__lock:
 +     if ((id >= 0 and id <= 255) or id == self.BROADCASTING_ID) and  tim >= 0 and tim <= 255:
 +       if self.TxPacket(self.OPT_RESET, 0x0, B2Bs(id) + B2Bs(tim), echo)[1]:
 +         return True
 +   return False
 +
 +  def Ping(self, id : int, echo = False) -> bool:
 +   if id >= 0 and id <= 254:
 +     for i in range(3):
 +       r = self.READ8(id, 0, 1, False, echo)
 +       if r != None: return True
 +   return False
 +
 +########################################​##################
 +# test code
 +########################################​##################
 +if __name__ == "__main__":
 +  import time
 +  from pyB3M import *
 +
 +  try:
 +   b3m = B3MProtocol('/dev/ttyAMA0', 1000000, 1)
 +  except:
 +   pass
 +  else:
 +   ID = 0
 +   ec = True
 +
 +   # r/w test
 +   print('reset=',b3m.RESET(ID, 0, echo = ec))
 +   time.sleep(0.5)
 +   print('load=',b3m.LOAD(ID, echo = ec))
 +   print('save=',b3m.SAVE(ID, echo = ec))
 +   print('read 5~9=',b3m.READ(ID, 0, 5, echo = ec).hex(','))
 +   print('read32 0=',b3m.READ32(ID, 0, echo = ec))
 +   print('read16 5=',b3m.READ16(ID, 5, echo = ec))
 +   print('read16 7=',b3m.READ16(ID, 7, echo = ec))
 +   print('read16 0x50=',b3m.READ16(ID, 0x50, echo = ec))
 +
 +   # r/w pid gain
 +   gain = list(b3m.READ16(ID, 0x5e, 3, echo = ec))
 +   print('pid r=',gain)
 +   print('pid w=',b3m.WRITE16(ID, 0x5e, gain, echo = ec))
 +   print('pid r=',b3m.READ16(ID, 0x5e, 3, echo = ec))
 +
 +   # write (multi)
 +   d = (B2Bs(ID)+W2Bs(gain), B2Bs(ID+1)+W2Bs(gain), B2Bs(ID+2)+W2Bs(gain), B2Bs(ID+3)+W2Bs(gain),)
 +   print(d)
 +   print('writes=',b3m.WRITEs(0x5e, d, echo = ec))
 +
 +   # motor ctrl
 +   #  set free
 +   if b3m.WRITE8(ID, 0x28, 2, echo = ec):
 +     #  sel pos mode
 +     if b3m.WRITE8(ID, 0x28, 2, echo = ec):
 +       #  sel profile
 +       if b3m.WRITE8(ID, 0x29, 1, echo = ec):
 +         #  set normal
 +         if b3m.WRITE8(ID, 0x28, 0, echo = ec):
 +           #  position
 +           for ang in (0, 4500, 9000, 4500, 0, -4500, -9000, -4500, 0):
 +             b3m.POSITION(ID, ang, 500)
 +             s = time.time() + 1
 +             while s > time.time():
 +               r = b3m.READ(ID, 0x27, 48)
 +               if r != None:
 +                 n = tuple(iter_unpack('<BHBBhhhhhhHHHhIHhhHH​HHiB', r))[0]
 +                 print(f'READ(0x27~0x56)={ang:7d}{n[5]:7d​}{n[8]:7d} {n[16]:5d}{n[17]:5d}{n[18]:7d}{n[19]:7d}​', end='\r')
 +               else:
 +                 print('READ(0x27~0x56)=xxxxx', end='\r')
 +         else:
 +           print('err:set normal')
 +       else:
 +         print('err:set profile')
 +     else:
 +       print('err:set pos mode')
 +   else:
 +     print('err:set free')
 +   print()
 +   # set free
 +   b3m.WRITE8(ID, 0x28, 2)
 +
 +   del b3m
 +
 +  print('fin')
 +</pre>
 +}}
 +
 +****全検索 [#w7b4050b]
 +IDやボーレートが分からない場合の検索。[[kbhit>https://gist.github.com/michelbl​/efda48b19d3e587685e3441a74457024]]要。~
 +B3M側の設定と異なるボーレートで疎通確認を行うため、B3Mの内部エラーが誘発されるのと、B3Mがパケットを誤認して設定が変わってしまうなどのリスクがあるので、あまり使うべきではない。
 +#html{{
 +<pre class="brush: python;"  title="scan.py">
 +#!/usr/bin/python3
 +#
 +# scan.py
 +
 +import sys, kbhit, time
 +from pyB3M import B3MProtocol
 +
 +kb = kbhit.KBHit()
 +try:
 +  b3m = B3MProtocol(sys.argv[1] if len(sys.argv) == 2 else '/dev/ttyAMA0', 115200, 0.02)
 +  for b in (115200, 1000000, 1250000, 1500000, 2000000, 3000000):
 +   b3m.baudrate = b
 +   for id in range(255):
 +     if kb.kbhit():
 +       k = kb.getch()
 +       break
 +     print(f' baud:{b:7} id:{id:3} ', end='find\n' if b3m.Ping(id) else '\r', flush=True)
 +   time.sleep(0.5)
 +  sys.stdout.write('\033[2K\033[1G')
 +except:
 +  print(' ERR:There is some problem.')
 +</pre>
 +}}
 +
 +****ID変更 [#e74d62c1]
 +任意のIDへ変更。
 +#html{{
 +<pre class="brush: python;"  title="changeid.py">
 +#!/usr/bin/python3
 +#
 +# changeid.py
 +
 +# Note that since processing begins after LOAD is performed,
 +# various parameters are the same as immediately after startup.
 +
 +import os, sys
 +from pyB3M import B3MProtocol
 +
 +baudlist = { 115200 : 0, 1000000 : 1, 1250000 : 2, 1500000 : 3, 2000000 : 4, 3000000 : 5 }
 +
 +if len(sys.argv) == 4:
 +  arg = sys.argv[1:]
 +  dev = '/dev/ttyAMA0'
 +elif len(sys.argv) == 5:
 +  arg = sys.argv[2:]
 +  dev = sys.argv[1]
 +else:
 +  arg = []
 +  dev = ''
 +
 +if not os.path.exists(dev):
 +  dev = ''
 +
 +if dev != '':
 +  try:
 +   baud = int(arg[0])
 +   baudind = baudlist[baud]
 +   previd = int(arg[1])
 +   newid = int(arg[2])
 +   b3m = B3MProtocol(dev, baud, timeout = 0.1)
 +  except:
 +   print(' ERR:There is some problem.')
 +  else:
 +   if newid >= 0 and newid <= 254 and previd >= 0 and previd <= 254:
 +     # Spell to eliminate the habit of poor readout of B3M
 +     b3m.Ping(previd)
 +     # Directs LOAD to the previous ID regardless of its presence or absence.
 +     b3m.LOAD(previd)
 +     # Confirm that the ID to be changed does not exist
 +     if not b3m.Ping(newid):
 +       # I don't know if it's necessary, but I'll put it in a free state.
 +       if b3m.WRITE8(previd, 0x28, 2):
 +         # Write new ID (It is an error because the response is changed with the new ID.)
 +         dum = b3m.WRITE8(previd, 0, newid)
 +         if b3m.Ping(newid):
 +           # SAVE
 +           if b3m.SAVE(newid):
 +             print(' OK')
 +           else:
 +             print(' NG -1')
 +         else:
 +           print(' NG -2')
 +       else:
 +         print(f' ERR: Device with ID:{previd} not found with BAUDRATE:{baud}')
 +     else:
 +       print(' ERR:Competing.')
 +   else:
 +     print(' ERR:There is some problem.')
 +else:
 +  print(' usage: changeid [line] <baudrate> <prev id> <new id>')
 +</pre>
 +}}
 +
 +****ボーレート変更 [#r468c92e]
 +任意のボーレートへ変更。~
 +セオリー通りの手順なのだが、手持ちのB3Mでは保存されないことが頻出して嫌気がさしている。
 +
 +#html{{
 +<pre class="brush: python;"  title="changebaud.py">
 +#!/usr/bin/python3
 +#
 +# changebaud.py
 +
 +# Note that since processing begins after LOAD is performed,
 +# various parameters are the same as immediately after startup.
 +
 +import os, sys
 +from pyB3M import B3MProtocol
 +
 +baudlist = { 115200 : 0, 1000000 : 1, 1250000 : 2, 1500000 : 3, 2000000 : 4, 3000000 : 5 }
 +
 +if len(sys.argv) == 4:
 +  arg = sys.argv[1:]
 +  dev = '/dev/ttyAMA0'
 +elif len(sys.argv) == 5:
 +  arg = sys.argv[2:]
 +  dev = sys.argv[1]
 +else:
 +  arg = []
 +  dev = ''
 +
 +if not os.path.exists(dev):
 +  dev = ''
 +
 +if dev != '':
 +  try:
 +   prevbaud = int(arg[0])
 +   prevind = baudlist[prevbaud]
 +   id = int(arg[1])
 +   newbaud = int(arg[2])
 +   newind = baudlist[newbaud]
 +   b3m = B3MProtocol(dev, newbaud, timeout = 0.1)
 +  except:
 +   print(' ERR: There is some problem.')
 +  else:
 +   # Verify that no devices exist at the changed baud rate
 +   if not b3m.Ping(id):
 +     b3m.baudrate = prevbaud
 +     # Spell to eliminate the habit of poor readout of B3M
 +     b3m.Ping(id)
 +     # Directs LOAD to the previous ID regardless of its presence or absence.
 +     b3m.LOAD(id)
 +     # I don't know if it's necessary, but I'll put it in a free state.
 +     if b3m.WRITE8(id, 0x28, 2):
 +       # Write new baudrate
 +       dum = b3m.WRITE32(id, 1, newbaud)
 +       if b3m.Ping(id):
 +         # SAVE
 +         if b3m.SAVE(id):
 +           b3m.RESET(id, 0)
 +           b3m.baudrate = newbaud
 +           if b3m.Ping(id):
 +             print(' OK')
 +           else:
 +             print(' NG -1')
 +         else:
 +           print(' NG -2')
 +       else:
 +         print(' NG -3')
 +     else:
 +       print(f' ERR: Device with ID:{id} not found with BAUDRATE:{prevbaud}')
 +   else:
 +     print(' ERR: Competing.')
 +else:
 +  print(' usage: changebaud [line] <baudrate> <id> <new baudrate>')
 +</pre>
 +}}
  

  • KONDO B3M のバックアップ差分(No. All)
    • 現: 2024-04-22 (月) 14:34:47 takaboo

トップ   差分 リロード印刷に適した表示   全ページ一覧 単語検索 最新ページの一覧   最新ページのRSS 1.0 最新ページのRSS 2.0 最新ページのRSS Atom