Page Top

テストコード anchor.png

Python前提。
B3Mの挙動の再現性が乏しくなることが頻出するので、正直なところ参考程度に考えておいてほしい。

Page Top

Python anchor.png

B3Mはパケット構成が緩いので、ファームウェアが想定していない条件で通信を行うと重篤な事態に陥りやすい。
WRITEとPOSITIONのみマルチに対応したコマンドとしてWRITEsとPOSITIONsを用意。
ERROR STATUSをどのように成否判定に使うべきかが判然としないので無視。

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# pyB3M.py
# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: (C) 2024 mukyokyo

import serial, threading, array, struct
from typing import Union
from collections import namedtuple
from struct import pack, unpack, iter_unpack
from typing import overload

##########################################################
# Functionalized the part of converting int to bytes.
# If specified as a tuple, it is converted to bytes at once.
##########################################################
def B2Bs(d) -> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d)
  else:
    return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d))

def W2Bs(d) -> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]])
  else:
    return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d))

def L2Bs(d) -> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]])
  else:
    return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d))

##########################################################
# API for Kondo B3M
##########################################################
class B3MProtocol:
  BROADCASTING_ID  = 0xff

  OPT_LOAD     = 1
  OPT_SAVE     = 2
  OPT_READ     = 3
  OPT_WRITE    = 4
  OPT_RESET    = 5
  OPT_POSITION = 6

  (SYSW_BAUD_57600, SYSW_BAUD_115_2k, SYSW_BAUD_625k, SYSW_BAUD_1M, SYSW_BAUD_1_25M, SYSW_BAUD_1_5M, SYSW_BAUD_2M, SYSW_BAUD_3M) = range(8)
  (SYSW_PARITY_NONE, SYSW_PARITY_ODD, SYSW_PARITY_EVEN) = range(3)
  (MOTW_OPT_NONE, MOTW_OPT_TORQUEON, MOTW_OPT_FREE, _, MOTW_OPT_BRAKE, _, _, _, MOTW_OPT_HOLD) = range(9)

  def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None):
    if isinstance(port, serial.Serial):
      self.__serial = port
      self.__baudrate = port.baudrate
      self.__timeout = port.timeout
    else:
      self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout)
      self.__baudrate = self.__serial.baudrate
      self.__timeout = self.__serial.timeout

    if lock == None:
      self.__lock = threading.Lock()
    else:
      self.__lock = lock
    self.__status = 0

  def __del__(self):
    pass

  @property
  def lock(self):
    return self.__lock

  @property
  def baudrate(self):
    return self.__serial.baudrate

  @baudrate.setter
  def baudrate(self, baudrate):
    self.__baudrate = baudrate
    self.__serial.baudrate = baudrate

  @property
  def timeout(self):
    return self.__serial.timeout

  @timeout.setter
  def timeout(self, timeout):
    self.__timeout = timeout
    self.__serial.timeout = timeout

  def __reconfig(self):
    self.__serial.baudrate = self.__baudrate
    self.__serial.timeout = self.__timeout

  @property
  def status(self):
    return self.__status

  def TxPacket(self, cmd : int, opt : int, param : bytes, echo = False) -> (bytes, bool):
    self.__reconfig()
    if cmd >= 0 and cmd <= 0xff and opt >= 0 and opt <= 0xff and len(param) <= (256 - 5):
      txp = bytes((len(param) + 4, cmd, opt)) + bytes(param)
      txp += B2Bs(sum(txp) & 0xff)
      self.__serial.reset_input_buffer()
      if echo: print('TX:', txp.hex(':'))
      self.__serial.write(txp)
      return txp, True
    return None, False

  def __rx(self, length) -> bytes:
    s = self.__serial.read(length)
    l = len(s)
    if l == length:
      return s
    else:
      r = s
      length -= l
      while self.__serial.in_waiting > 0:
        s = self.__serial.read(length)
        r += s
        length -= len(s)
        if length <= 0:
          break
      return r

  def RxPacket(self, echo = False) -> (bytes, bool):
    rxp = self.__rx(1)
    if rxp:
      if len(rxp) == 1:
        if rxp[0] >= 5:
          l = rxp[0] - 1
          rxp += self.__rx(l)
          if rxp[1] >= 0x81 and rxp[1] <= 0x86 and rxp[3] <= 0xfe and rxp[-1] == (sum(rxp[:-1]) & 0xff):
            self.__status = rxp[2]
            if echo: print('RX:', rxp.hex(':'))
            return bytes(rxp), True
    if echo: print('RX;', rxp.hex(';'), ' xxx')
    return None, False

  def WRITE(self, id : int, addr : int, data : bytes, reset = False, echo = False) -> bool:
    with self.__lock:
      if id >= 0 and id <= 255 and addr >= 0 and addr <= 0xff and len(data) <= 256 - 7:
        if self.TxPacket(self.OPT_WRITE, 0x80 if reset else 0x00, B2Bs(id) + data + B2Bs(addr) + B2Bs(1), echo)[1]:
          if id != self.BROADCASTING_ID:
            d, r = self.RxPacket(echo)
            if r:
              return (d[3] == id) and (d[1] == (self.OPT_WRITE | 0x80))
          else:
            return True
      return False

  def WRITEs(self, addr : int, iddatas : Union[list, tuple], reset = False, echo = False) -> bool:
    with self.__lock:
      if addr >= 0 and addr <= 0xff and (isinstance(iddatas, list) or isinstance(iddatas, tuple)):
        cnt = len(iddatas)
        if cnt > 0:
          txd = bytes(0)
          l = len(iddatas[0])
          for iddata in iddatas:
            if iddata[0] < 0 or iddata[0] > 254 or len(iddata) != l:
              return False
            txd += iddata
          txd +=  B2Bs(addr) + B2Bs(cnt)
          return self.TxPacket(self.OPT_WRITE, 0x80 if reset else 0x00, txd, echo)[1]
      return False

  def WRITE8(self, id : int, addr : int, data : Union[int, list, tuple], reset = False, echo = False) -> bool:
    return self.WRITE(id, addr, B2Bs(data), reset, echo)

  def WRITE16(self, id : int, addr : int, data : Union[int, list, tuple], reset = False, echo = False) -> bool:
    return self.WRITE(id, addr, W2Bs(data), reset, echo)

  def WRITE32(self, id : int, addr : int, data : Union[int, list, tuple], reset = False, echo = False) -> bool:
    return self.WRITE(id, addr, L2Bs(data), reset, echo)

  def READ(self, id : int, addr : int, length : int, reset = False, echo = False) -> bytes:
    with self.__lock:
      if addr >= 0 and id <= 254 and addr >= 0 and addr <= 0xff and length >= 1 and length <= 250:
        if self.TxPacket(self.OPT_READ, 0x80 if reset else 0x00, B2Bs(id) + B2Bs(addr) + B2Bs(length), echo)[1]:
          d, r = self.RxPacket(echo)
          if r:
            if d[1] == (self.OPT_READ | 0x80) and id == d[3] and len(d) == length + 5:
              return bytes(d[4:-1])
      return None

  def READ8(self, id : int, addr : int, length = 1, signed = False, reset = False, echo = False) -> int:
    r = self.READ(id, addr, length, reset, echo)
    if r != None:
      n = sum(iter_unpack('b' if signed else 'B', r), ())
      return n if length > 1 else n[0]
    return None

  def READ16(self, id : int, addr : int, length = 1, signed = False, reset = False, echo = False) -> int:
    r = self.READ(id, addr, 2 * length, reset, echo)
    if r != None:
      n = sum(iter_unpack('h' if signed else 'H', r), ())
      return n if length > 1 else n[0]
    return None

  def READ32(self, id : int, addr : int, length = 1, signed = False, reset = False, echo = False) -> int:
    r = self.READ(id, addr, 4 * length, reset, echo)
    if r != None:
      n = sum(iter_unpack('i' if signed else 'I', r), ())
      return n if length > 1 else n[0]
    return None

  def LOAD(self, id : int, echo = False) -> bool:
    with self.__lock:
      if self.TxPacket(self.OPT_LOAD, 0x0, B2Bs(id), echo)[1]:
        if id != self.BROADCASTING_ID:
          d, r = self.RxPacket(echo)
          if r:
            return id == d[3] and d[1] == (self.OPT_LOAD | 0x80) and len(d) == 5
        else:
          return True;
    return False

  def SAVE(self, id : int, echo = False) -> bool:
    with self.__lock:
      if self.TxPacket(self.OPT_SAVE, 0x0, B2Bs(id), echo)[1]:
        if id != self.BROADCASTING_ID:
          d, r = self.RxPacket(echo)
          if r:
            return id == d[3] and d[1] == (self.OPT_SAVE| 0x80) and len(d) == 5
        else:
          return True;
      return False

  def POSITION(self, id : int, pos : int, tim : int, reset = False, echo = False) -> int:
    with self.__lock:
      if pos >= -32000 and pos <= 32000 and tim >= 0 and tim <= 65535:
        if self.TxPacket(self.OPT_POSITION, 0x80 if reset else 0x00, B2Bs(id) + W2Bs(pos) + W2Bs(tim), echo):
          if id != self.BROADCASTING_ID:
            dat, r = self.RxPacket(echo)
            if r:
              if dat[3] == id and len(dat) == 7:
                return tuple(n[0] for n in iter_unpack('<h', dat[4:6]))[0]
            else:
              return None
          else:
            return None
      return None

  def POSITIONs(self, idposs : Union[list, tuple], tim : int, reset = False, echo = False) -> bool:
    with self.__lock:
      if tim >= 0 and tim <= 65535:
        cnt = len(idposs)
        if cnt > 0:
          txd = bytes(0)
          l = len(idposs[0])
          for idpos in idposs:
            if len(idpos) != 2:
              return False
            if idpos[0] < 0 or idpos[0] > 254 or len(idpos) != l or idpos[1] < -32000 or idpos[1] > 32000:
              return False
            txd += B2Bs(idpos[0]) + W2Bs(idpos[1])
          txd += W2Bs(tim)
          return self.TxPacket(self.OPT_POSITION, 0x80 if reset else 0x00, txd, echo)[1]
      return False

  def RESET(self, id : int, tim : int, echo = False) -> bool:
    with self.__lock:
      if ((id >= 0 and id <= 255) or id == self.BROADCASTING_ID) and  tim >= 0 and tim <= 255:
        if self.TxPacket(self.OPT_RESET, 0x0, B2Bs(id) + B2Bs(tim), echo)[1]:
          return True
    return False

  def Ping(self, id : int, echo = False) -> bool:
    if id >= 0 and id <= 254:
      for i in range(3):
        r = self.READ8(id, 0, 1, False, echo)
        if r != None: return True
    return False

##########################################################
# test code
##########################################################
if __name__ == "__main__":
  import time
  from pyB3M import *

  try:
    b3m = B3MProtocol('/dev/ttyAMA0', 1000000, 1)
  except:
    pass
  else:
    ID = 0
    ec = True

    # r/w test
    print('reset=',b3m.RESET(ID, 0, echo = ec))
    time.sleep(0.5)
    print('load=',b3m.LOAD(ID, echo = ec))
    print('save=',b3m.SAVE(ID, echo = ec))
    print('read 5~9=',b3m.READ(ID, 0, 5, echo = ec).hex(','))
    print('read32 0=',b3m.READ32(ID, 0, echo = ec))
    print('read16 5=',b3m.READ16(ID, 5, echo = ec))
    print('read16 7=',b3m.READ16(ID, 7, echo = ec))
    print('read16 0x50=',b3m.READ16(ID, 0x50, echo = ec))

    # r/w pid gain
    gain = list(b3m.READ16(ID, 0x5e, 3, echo = ec))
    print('pid r=',gain)
    print('pid w=',b3m.WRITE16(ID, 0x5e, gain, echo = ec))
    print('pid r=',b3m.READ16(ID, 0x5e, 3, echo = ec))

    # write (multi)
    d = (B2Bs(ID)+W2Bs(gain), B2Bs(ID+1)+W2Bs(gain), B2Bs(ID+2)+W2Bs(gain), B2Bs(ID+3)+W2Bs(gain),)
    print(d)
    print('writes=',b3m.WRITEs(0x5e, d, echo = ec))

    # motor ctrl
    #  set free
    if b3m.WRITE8(ID, 0x28, 2, echo = ec):
      #  sel pos mode
      if b3m.WRITE8(ID, 0x28, 2, echo = ec):
        #  sel profile
        if b3m.WRITE8(ID, 0x29, 1, echo = ec):
          #  set normal
          if b3m.WRITE8(ID, 0x28, 0, echo = ec):
            #  position
            for ang in (0, 4500, 9000, 4500, 0, -4500, -9000, -4500, 0):
              b3m.POSITION(ID, ang, 500)
              s = time.time() + 1
              while s > time.time():
                r = b3m.READ(ID, 0x27, 48)
                if r != None:
                  n = tuple(iter_unpack('<BHBBhhhhhhHHHhIHhhHHHHiB', r))[0]
                  print(f'READ(0x27~0x56)={ang:7d}{n[5]:7d}{n[8]:7d} {n[16]:5d}{n[17]:5d}{n[18]:7d}{n[19]:7d}', end='\r')
                else:
                  print('READ(0x27~0x56)=xxxxx', end='\r')
          else:
            print('err:set normal')
        else:
          print('err:set profile')
      else:
        print('err:set pos mode')
    else:
      print('err:set free')
    print()
    # set free
    b3m.WRITE8(ID, 0x28, 2)

    del b3m

  print('fin')
Page Top
全検索 anchor.png

IDやボーレートが分からない場合の検索。kbhit要。
B3M側の設定と異なるボーレートで疎通確認を行うため、B3Mの内部エラーが誘発されるのと、B3Mがパケットを誤認して設定が変わってしまうなどのリスクがあるので、あまり使うべきではない。

#!/usr/bin/python3
#
# scan.py

import sys, kbhit, time
from pyB3M import B3MProtocol

kb = kbhit.KBHit()
try:
  b3m = B3MProtocol(sys.argv[1] if len(sys.argv) == 2 else '/dev/ttyAMA0', 115200, 0.02)
  for b in (115200, 1000000, 1250000, 1500000, 2000000, 3000000):
    b3m.baudrate = b
    for id in range(255):
      if kb.kbhit():
        k = kb.getch()
        break
      print(f' baud:{b:7} id:{id:3} ', end='find\n' if b3m.Ping(id) else '\r', flush=True)
    time.sleep(0.5)
  sys.stdout.write('\033[2K\033[1G')
except:
  print(' ERR:There is some problem.')
Page Top
ID変更 anchor.png

任意のIDへ変更。

#!/usr/bin/python3
#
# changeid.py

# Note that since processing begins after LOAD is performed,
# various parameters are the same as immediately after startup.

import os, sys
from pyB3M import B3MProtocol

baudlist = { 115200 : 0, 1000000 : 1, 1250000 : 2, 1500000 : 3, 2000000 : 4, 3000000 : 5 }

if len(sys.argv) == 4:
  arg = sys.argv[1:]
  dev = '/dev/ttyAMA0'
elif len(sys.argv) == 5:
  arg = sys.argv[2:]
  dev = sys.argv[1]
else:
  arg = []
  dev = ''

if not os.path.exists(dev):
  dev = ''

if dev != '':
  try:
    baud = int(arg[0])
    baudind = baudlist[baud]
    previd = int(arg[1])
    newid = int(arg[2])
    b3m = B3MProtocol(dev, baud, timeout = 0.1)
  except:
    print(' ERR:There is some problem.')
  else:
    if newid >= 0 and newid <= 254 and previd >= 0 and previd <= 254:
      # Spell to eliminate the habit of poor readout of B3M
      b3m.Ping(previd)
      # Directs LOAD to the previous ID regardless of its presence or absence.
      b3m.LOAD(previd)
      # Confirm that the ID to be changed does not exist
      if not b3m.Ping(newid):
        # I don't know if it's necessary, but I'll put it in a free state.
        if b3m.WRITE8(previd, 0x28, 2):
          # Write new ID (It is an error because the response is changed with the new ID.)
          dum = b3m.WRITE8(previd, 0, newid)
          if b3m.Ping(newid):
            # SAVE
            if b3m.SAVE(newid):
              print(' OK')
            else:
              print(' NG -1')
          else:
            print(' NG -2')
        else:
          print(f' ERR: Device with ID:{previd} not found with BAUDRATE:{baud}')
      else:
        print(' ERR:Competing.')
    else:
      print(' ERR:There is some problem.')
else:
  print(' usage: changeid [line] <baudrate> <prev id> <new id>')
Page Top
ボーレート変更 anchor.png

任意のボーレートへ変更。
セオリー通りの手順なのだが、手持ちのB3Mでは保存されないことが頻出して嫌気がさしている。

#!/usr/bin/python3
#
# changebaud.py

# Note that since processing begins after LOAD is performed,
# various parameters are the same as immediately after startup.

import os, sys
from pyB3M import B3MProtocol

baudlist = { 115200 : 0, 1000000 : 1, 1250000 : 2, 1500000 : 3, 2000000 : 4, 3000000 : 5 }

if len(sys.argv) == 4:
  arg = sys.argv[1:]
  dev = '/dev/ttyAMA0'
elif len(sys.argv) == 5:
  arg = sys.argv[2:]
  dev = sys.argv[1]
else:
  arg = []
  dev = ''

if not os.path.exists(dev):
  dev = ''

if dev != '':
  try:
    prevbaud = int(arg[0])
    prevind = baudlist[prevbaud]
    id = int(arg[1])
    newbaud = int(arg[2])
    newind = baudlist[newbaud]
    b3m = B3MProtocol(dev, newbaud, timeout = 0.1)
  except:
    print(' ERR: There is some problem.')
  else:
    # Verify that no devices exist at the changed baud rate
    if not b3m.Ping(id):
      b3m.baudrate = prevbaud
      # Spell to eliminate the habit of poor readout of B3M
      b3m.Ping(id)
      # Directs LOAD to the previous ID regardless of its presence or absence.
      b3m.LOAD(id)
      # I don't know if it's necessary, but I'll put it in a free state.
      if b3m.WRITE8(id, 0x28, 2):
        # Write new baudrate
        dum = b3m.WRITE32(id, 1, newbaud)
        if b3m.Ping(id):
          # SAVE
          if b3m.SAVE(id):
            b3m.RESET(id, 0)
            b3m.baudrate = newbaud
            if b3m.Ping(id):
              print(' OK')
            else:
              print(' NG -1')
          else:
            print(' NG -2')
        else:
          print(' NG -3')
      else:
        print(f' ERR: Device with ID:{id} not found with BAUDRATE:{prevbaud}')
    else:
      print(' ERR: Competing.')
else:
  print(' usage: changebaud [line] <baudrate> <id> <new baudrate>')

Front page   Diff ReloadPrint View   Page list Search Recent changes   RSS of recent changes (RSS 1.0) RSS of recent changes (RSS 2.0) RSS of recent changes (RSS Atom)
Last-modified: 2024-04-22 (Mon) 14:34:47 (JST) (173d)