PMXシリーズ anchor.png

pmx_test.png

動かすだけならここだけおさえておけば良しという事で、公式サイトのリンクをブックマーク。
PMXシリーズ
PMXサーボシリーズオンラインマニュアル

Page Top

下準備 anchor.png

B3Mシリーズと大差ないと思いきや、コネクタが全く違う。さらに標準のケーブルがぶっとい。DXHUBにつなぎたかったので、コレを使ってケーブルを自作。

EH_DF1B.png

ひとまず電源は手持ちのACアダプタを使う事にしたが、結果的に市販の物では全く容量足りず。トラブルを避けるには、ガッツリ電流取れる電池か安定化電源用意した方が良いだろう。

聞くところによるとファームウェアは日々更新されているらしく、必ず新しいバージョンを適用して使うべしとの事。
確かにそのままでは後に紹介するスクリプトがエラーになる事があったので、有無も言わさず更新すべきかと。

Page Top

雑感 anchor.png

上澄みだけさらった感触を書いてみる。

  • KRS時代には無かった単純なPIDによる角度・速度・電流の制御
  • 角度・速度・電流を組み合わせたカスケード制御が可能
  • 分解能の高い非接触角度センサを装備し、個々に高精度なキャリブレーションがなされている
  • 一般的なUARTの通信プロトコル(8N1)
  • シンプルな通信プロトコルで、大抵の機能へのアクセスはメモリマップ上に配置された情報の読み書きで済む
  • B3M程の発熱は感じず
  • メモリマップの読み書きだけでも運転可能
  • パケットの応答時間がどこぞやと異なり、かなり一定
  • フィールドでのファームウェアのパッチ当てが可能

DXLやB3Mシリーズを使ったことがあれば運用は簡単。PICのイメージが色濃いKRSとは一線を画すので、それに慣れ切っていると難しい。
B3Mからの仕切り直し感を感じるので、種々のリクエストを吸収してもらえる事を大いに期待し、初期リリース版に触れて感じた点も書いておく。

  • 3つの制御モード全てを組み合わせたカスケード制御は不可
  • ゲインにフィードフォワードが無い
    足回りなどで常時オフセットが生じる速度制御をメインで使う時には結構有用なのだが
  • パケット上アドレスは16bit幅だがデータ長が8bit指定なため、全メモリの一括読み出しは不可
    メモリマップ全てを仮想化するには分割して読むしかなく
  • 複数軸を同期させる方法の記述が無いが、いずれB3Mのマルチみたいなものが追加されるのか?
  • エラーフラグを読み出さないといけないのが少々煩雑
  • そこそこ電源容量が必要なためACアダプタによる運用は注意が必要
    実際に秋月のACアダプタでも運用しきれなかった
  • 今後はBLDCなのかな。。。
Page Top

テストコード anchor.png

Pythonばかり使っていると悪い頭が益々悪くなりそうだし鬼のように遅いが、多勢に無勢。あれやこれやと説明せずとも大抵の事はヌルッとイケるので、ここでも動作確認用に使う事にした。

Page Top

Python anchor.png

ここら辺にある事は既知だが、それをあえて見ずに基本的なコマンドを関数にしたクラスを作った(Python3.9以降で動作する事を確認)。
ホントはDXL用のスクリプトを作ったついでにあつらえたのだが、メモリマップ上に配置された情報を読み書きするという概念は同じなので、あっさりと実現。
なおbytesを直接的に扱うのは避けたかったので、一部を除きできるだけint型とリストやタプルで記述できるようにした。使い方は末尾のデモコードを参考にすれば大半は理解できるかと思う。
なお連続で通信する場合は2バイト以上の間隔を置けと注意事項に記されているのだが、ここではそれを一切考慮していない。将来奇跡的にホスト側の処理スピードが速くなり、前後のパケットが近接するようになったら考える事にする。

ソースへの直リンク

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# pyPMX.py
# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: (C) 2024 mukyokyo

import serial, threading, array, struct
from typing import Union
from collections import namedtuple
from struct import pack, unpack, iter_unpack

##########################################################
# Functionalized the part of converting int to bytes.
# If specified as a tuple, it is converted to bytes at once.
##########################################################
def B2Bs(d) --> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d)
  else:
    return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d))

def W2Bs(d) --> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]])
  else:
    return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d))

def L2Bs(d) --> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]])
  else:
    return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d))

##########################################################
# API for Kondo PMX
##########################################################
class PMXProtocol:
  BROADCASTING_ID  = 0xff
  CMD_MemREAD      = 0xa0
  CMD_MemWRITE     = 0xa1
  CMD_LOAD         = 0xa2
  CMD_SAVE         = 0xa3
  CMD_MotorREAD    = 0xa4
  CMD_MotorWRITE   = 0xa5
  CMD_SystemREAD   = 0xbb
  CMD_SystemWRITE  = 0xbc
  CMD_ReBoot       = 0xbd
  CMD_FactoryReset = 0xbe
  CMD_SystemINIT   = 0xbf

  (SYSW_BAUD_57600, SYSW_BAUD_115_2k, SYSW_BAUD_625k, SYSW_BAUD_1M, SYSW_BAUD_1_25M, SYSW_BAUD_1_5M, SYSW_BAUD_2M, SYSW_BAUD_3M) = range(8)
  (SYSW_PARITY_NONE, SYSW_PARITY_ODD, SYSW_PARITY_EVEN) = range(3)
  (MOTW_OPT_NONE, MOTW_OPT_TORQUEON, MOTW_OPT_FREE, _, MOTW_OPT_BRAKE, _, _, _, MOTW_OPT_HOLD) = range(9)

  __crc16_lutable = array.array('H')

  def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None):
    if isinstance(port, serial.Serial):
      self.__serial = port
      self.__baudrate = port.baudrate
      self.__timeout = port.timeout
    else:
      self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout)
      self.__baudrate = self.__serial.baudrate
      self.__timeout = self.__serial.timeout

    if lock == None:
      self.__lock = threading.Lock()
    else:
      self.__lock = lock
    self.__status = 0
    poly = 0x1021
    for i in range(256):
      nAccum = i << 8
      for j in range(8):
        nAccum = (poly ^ (nAccum << 1) if nAccum & 0x8000 else nAccum << 1) & 0xffff
      self.__crc16_lutable.append(nAccum)

  def __del__(self):
    del self.__crc16_lutable[:]

  @property
  def lock(self):
    return self.__lock

  @property
  def baudrate(self):
    return self.__serial.baudrate

  @baudrate.setter
  def baudrate(self, baudrate):
    self.__baudrate = baudrate
    self.__serial.baudrate = baudrate

  @property
  def timeout(self):
    return self.__serial.timeout

  @timeout.setter
  def timeout(self, timeout):
    self.__timeout = timeout
    self.__serial.timeout = timeout

  def __reconfig(self):
    self.__serial.baudrate = self.__baudrate
    self.__serial.timeout = self.__timeout

  @property
  def status(self):
    return self.__status

  def __crc16(self, data : bytes) --> int:
    crc = 0
    for d in data:
      crc = (crc << 8) ^ self.__crc16_lutable[((crc ->-> 8) ^ d) & 0xff]
    return crc & 0xffff

  def TxPacket(self, id : int, cmd : int, opt : int, param : bytes, echo = False) --> (bytes, bool):
    self.__reconfig()
    if ((id ->= 0 and id <= 239) or id == self.BROADCASTING_ID) and len(param) <= (256 - 8):
      txp = bytes([0xfe,0xfe,id,len(param) + 8,cmd,opt]) + bytes(param)
      txp += W2Bs(self.__crc16(txp))
      self.__serial.reset_input_buffer()
      if echo: print('TX:', txp.hex(':'))
      self.__serial.write(txp)
      return txp, True
    return None, False

  def __rx(self, length) --> bytes:
    s = self.__serial.read(length)
    l = len(s)
    if l == length:
      return s
    else:
      r = s
      length -= l
      while self.__serial.in_waiting -> 0:
        s = self.__serial.read(length)
        r += s
        length -= len(s)
        if length <= 0:
          break
      return r

  def RxPacket(self, echo = False) --> (bytes, bool):
    rxp = self.__rx(6)
    if rxp:
      if len(rxp) == 6:
        if rxp[3] -> 0:
          l = rxp[3] - 6
          if l -> 0:
            rxp += self.__rx(l)
            if len(rxp) == l + 6 and rxp[0] == 0xfe and rxp[1] == 0xfe and unpack('<H', rxp[-2:])[0] == self.__crc16(rxp[:-2]):
              self.__status = rxp[5]
              if echo: print('RX:', rxp.hex(':'))
              return bytes(rxp), True
    return None, False

  def MemWRITE(self, id : int, addr : int, data : bytes, echo = False) --> bool:
    with self.__lock:
      if ((id ->= 0 and id <= 239) or id == self.BROADCASTING_ID) and addr ->= 0 and addr <= 0x4ff:
        if self.TxPacket(id, self.CMD_MemWRITE, 1, W2Bs(addr) + data, echo)[1]:
          if id != self.BROADCASTING_ID:
            d, r = self.RxPacket(echo)
            if r:
              return (d[2] == id) and (d[4] == 0x21)
          else:
            return True
      return False

  def MemWRITE8(self, id : int, addr : int, data : Union[int, list, tuple], echo = False) --> bool:
    return self.MemWRITE(id, addr, B2Bs(data), echo)

  def MemWRITE16(self, id : int, addr : int, data : Union[int, list, tuple], echo = False) --> bool:
    return self.MemWRITE(id, addr, W2Bs(data), echo)

  def MemWRITE32(self, id : int, addr : int, data : Union[int, list, tuple], echo = False) --> bool:
    return self.MemWRITE(id, addr, L2Bs(data), echo)

  def MemREAD(self, id : int, addr : int, length : int, echo = False) --> bytes:
    with self.__lock:
      if addr ->= 0 and id <= 239 and addr ->= 0 and addr <= 0x4ff and length -> 0 and length -> 0 and length <= 247:
        if self.TxPacket(id, self.CMD_MemREAD, 0, W2Bs(addr) + B2Bs(length), echo)[1]:
          d, r = self.RxPacket(echo)
          if r:
            if d[4] == 0x20 and id == d[2] and (d[5] & (4 + 8 + 0x10 + 0x40)) == 0:
              return bytes(d[6:-2])
      return None

  def MemREAD8(self, id : int, addr : int, length = 1, signed = False, echo = False) --> int:
    r = self.MemREAD(id, addr, length, echo)
    if r != None:
      n = sum(iter_unpack('b' if signed else 'B', r), ())
      return n if length -> 1 else n[0]
    return None

  def MemREAD16(self, id : int, addr : int, length = 1, signed = False, echo = False) --> int:
    r = self.MemREAD(id, addr, 2 * length, echo)
    if r != None:
      n = sum(iter_unpack('h' if signed else 'H', r), ())
      return n if length -> 1 else n[0]
    return None

  def MemREAD32(self, id : int, addr : int, length = 1, signed = False, echo = False) --> int:
    r = self.MemREAD(id, addr, 4 * length, echo)
    if r != None:
      n = sum(iter_unpack('i' if signed else 'I', r), ())
      return n if length -> 1 else n[0]
    return None

  def LOAD(self, id : int, echo = False) --> bool:
    with self.__lock:
      if self.TxPacket(id, self.CMD_LOAD, 0, (), echo)[1]:
        if id != self.BROADCASTING_ID:
          d, r = self.RxPacket(echo)
          if r:
            return id == d[2] and d[4] == 0x22
        else:
          return True;
    return False

  def SAVE(self, id : int, echo = False) --> bool:
    with self.__lock:
      if self.TxPacket(id, self.CMD_SAVE, 0, (), echo)[1]:
        if id != self.BROADCASTING_ID:
          d, r = self.RxPacket(echo)
          if r:
            return id == d[2] and d[4] == 0x23
        else:
          return True;
      return False

  def MotorREAD(self, id : int, echo = False) --> tuple:
    with self.__lock:
      if self.TxPacket(id, self.CMD_MotorREAD, 0, (), echo):
        if id != self.BROADCASTING_ID:
          dat, r = self.RxPacket(echo)
          if r:
            return tuple([dat[6], tuple(n[0] for n in iter_unpack('<h', dat[7:-2])) if len(dat[7:-2]) -> 0 and dat[4] == 0x24 else ()])
          else:
            return None
        else:
          return ()
      return None

  def MotorWRITE(self, id : int, opt : int, dat : (), echo = False) --> tuple:
    with self.__lock:
      if self.TxPacket(id, self.CMD_MotorWRITE, opt, W2Bs(dat), echo):
        if id != self.BROADCASTING_ID:
          dat, r = self.RxPacket(echo)
          if r:
            return tuple([dat[6], tuple(n[0] for n in iter_unpack('<h', dat[7:-2])) if len(dat[7:-2]) -> 0 and dat[4] == 0x25 else ()])
          else:
            return None
        else:
          return ()
      return None

  def SystemREAD(self, id : int, echo = False) --> tuple:
    with self.__lock:
      if self.TxPacket(id, self.CMD_SystemREAD, 0, (), echo)[1]:
        d, r = self.RxPacket(echo)
        if r:
          if len(d[6:-2]) == 13 and d[4] == 0x3b:
            v = tuple(iter_unpack('<IIIB', d[6:-2]))[0]
            return tuple(v)
      return None

  def SystemWRITE(self, id : int, data : (), echo = False) --> bool:
    if data[0] ->= 0 and data[0] <= 239 and data[1] ->= 0 and data[1] <= 7 and data[2] ->= 0 and data[2] <= 2 and data[3] ->= 1 and data[3] <= 255:
      d = self.SystemREAD(id, echo)
      if d:
        with self.__lock:
          if self.TxPacket(id, self.CMD_SystemWRITE, 0xf, L2Bs(d[0]) + B2Bs(data), echo)[1]:
            d, r = self.RxPacket(echo)
            if r:
              return id == d[2] and d[4] == 0x3c and (d[5] & (4 + 8 + 0x10 + 0x20 + 0x40)) == 0
    return False

  def ReBoot(self, id : int, echo = False) --> bool:
    with self.__lock:
      if id != self.BROADCASTING_ID:
        if self.TxPacket(id, self.CMD_ReBoot, 0, W2Bs(0), echo)[1]:
          d, r = self.RxPacket(echo)
          if r:
            return d[2] == id and d[4] == 0x3d
      return False

  def FactoryReset(self, id : int, echo = False) --> bool:
    if id != self.BROADCASTING_ID:
      d = self.SystemREAD(id, echo)
      if d != None:
        with self.__lock:
          if self.TxPacket(id, self.CMD_FactoryReset, 0, L2Bs(d[0]), echo)[1]:
            d, r = self.RxPacket(echo)
            if r:
              return d[2] == id and d[4] == 0x3e
    return False

  def FullScan(self) --> tuple:
    orgtimeout = self.timeout
    orgbaudrate = self.baudrate
    baud=(57600,115200,625000,1000000,1250000,1500000,2000000,3000000)
    self.timeout = 0.005
    for b in baud:
      self.baudrate = b
      for id in range(240):
        r = self.MemREAD8(id, 0)
        print(f'baud:{b:7} id:{id:3} :', end='find\n' if r else 'none\r')
    self.timeout = orgtimeout
    self.baudrate = orgbaudrate

##########################################################
# test code
##########################################################
if __name__ == "__main__":
  import time
  from pyPMX import *

  try:
    pmx = PMXProtocol('\\\\.\\COM12', 3000000)
  except:
    pass
  else:
    ID = 0

    # reboot
    print('ReBoot=', pmx.ReBoot(ID, echo=True))
    time.sleep(0.5)
    '''
    # factory reset
    print('FactoryReset=',pmx.FactoryReset(ID))
    time.sleep(0.5)
    '''
    # load
    print('LOAD=', pmx.LOAD(ID, echo=True))
    # save
    print('SAVE=', pmx.SAVE(ID, echo=True))
    '''
    # sys write
    print('SysWrite=',pmx.SystemWRITE(ID, (ID, pmx.SYSW_BAUD_115_2k, pmx.SYSW_PARITY_NONE, 20)))
    time.sleep(0.5)
    '''
    # sys read
    r = pmx.SystemREAD(ID)
    print('SysRead=', f'${r[0]:08x} ${r[1]:08x} ${r[2]:08x} {r[3]:d} ' if r else 'False')
    # dump memory
    print('dump')
    for addr in range(0, 800, 20):
      r = pmx.MemREAD(ID, addr, 20)
      print(f' MemREAD({addr}:{20})=', r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!')
    # gain
    r = pmx.MemREAD(ID, 0, 64)
    print('gain\n MemREAD(0:64)=', *iter_unpack('<IIIIIIIIIIIIIIII', r) if r else '!!!!!!!!')
    # voltage limit
    r = pmx.MemREAD(ID, 76, 8)
    print('volt lim\n MemREAD(76:8)=', *iter_unpack('<HHHH', r) if r else '!!!!!!!!')
    # present value
    r = pmx.MemREAD(ID, 300, 24)
    print('present val\n MemREAD(300:20)=', *iter_unpack('<hhhhhhhhhHHH', r) if r else '!!!!!!!!')
    # error stat
    r = pmx.MemREAD(ID, 400, 6)
    print('err stat\n MemREAD(0:400:7)=', *iter_unpack('<BBBxH', r) if r else '!!!!!!!!')

    print('preparation for pos ctrl')
    # ctrl off

    print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ()))
    time.sleep(0.1)
    # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time
    print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 1))
    print(' MemWRITE8(502)=', pmx.MemWRITE8(ID, 502, 0b11111))
    time.sleep(0.1)

    # goal angle
    print('start pos ctrl')
    print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ()))
    for ang in tuple(range(0, 32000, 500)) + tuple(range(32000, -32000, -500)) + tuple(range(-32000, 500, 500)):
      for n in range(10):
        r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (ang,))
        if r != None:
          if len(r) -> 0:
            print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r')
            pmx.MemREAD(ID, 400, 6)
        else:
          print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!')
          break
        time.sleep(0.005)
      else:
        continue
      pass
      break
    else:
      print()
    time.sleep(0.1)
    print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ()))



    print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ()))
    time.sleep(0.1)
    # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time
    print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 2))
    time.sleep(0.1)

    # goal angle
    print('start speed ctrl')
    print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ()))
    for spd in tuple(range(0, 2000, 50)) + tuple(range(2000, -2000, -50)) + tuple(range(-2000, 50, 50)):
      for n in range(30):
        r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (spd,))
        if r != None:
          if len(r) -> 0:
            print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r')
            pmx.MemREAD(ID, 400, 6)
        else:
          print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!')
          break
        time.sleep(0.005)
      else:
        continue
      pass
      break
    else:
      print()
    time.sleep(0.1)
    print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ()))

    del pmx

  print('fin')

こちらでは主にここにあるWindows環境の未リリース版イメージで試しているが、基本的にはMSYS2をインストールしておけばその中で動かせる。もちろんLinuxやmacでも動くので、気楽に試してみてはどうだろう。

Page Top
全検索 anchor.png

IDやボーレートが分からないと何も始められないので、まずは検索ができると何かと便利。
検索はライブラリに入れておいた機能を呼び出すだけなので、次のコードで簡単に利用できる。

#!/usr/bin/python3
import os, sys
from pyPMX import PMXProtocol

dev = '\\\\.\\COM12' if len(sys.argv) == 1 else sys.argv[1]
if not os.path.exists(dev):
  dev = ''

if dev != '':
  try:
    pmx = PMXProtocol(dev, 57600)
    pmx.FullScan()
  except:
    print(' ERR:There is some problem.')
    pass
  sys.stdout.write('\033[2K\033[1G')
else:
  print(' usage: scan [line]')
Page Top
ID変更 anchor.png

ネットワーク上のPMXは各々個別のIDが割り振られている必要があり、同一のものが存在している事を許容していない。
次のコードは任意のIDへ変更するものだが、少しでも疑義が生じたらIDの変更を行わないようにしている。

#!/usr/bin/python3
import os, sys
from pyPMX import PMXProtocol

baudlist = { 57600 : 0, 115200 : 1, 625000 : 2, 1000000 : 3, 1250000 : 4, 1500000 : 5, 2000000 : 6, 3000000 : 7 }

if len(sys.argv) == 4:
  arg = sys.argv[1:]
  dev = '\\\\.\\COM12'
elif len(sys.argv) == 5:
  arg = sys.argv[2:]
  dev = sys.argv[1]
else:
  arg = []
  dev = ''

if not os.path.exists(dev):
  dev = ''

if dev != '':
  try:
    baud = int(arg[0])
    baudind = baudlist[baud]
    previd = int(arg[1])
    newid = int(arg[2])
    pmx = PMXProtocol(dev, baud)
  except:
    print(' ERR:There is some problem.')
  else:
    if newid >= 0 and newid <= 239:
      if pmx.MemREAD8(newid, 0) == None:
        s = (newid, baudind, pmx.SYSW_PARITY_NONE, 20)
        if pmx.MotorWRITE(previd, pmx.MOTW_OPT_FREE, ()) != None:
          if pmx.SystemWRITE(previd, s):
            print(' OK')
          else:
            print(' NG')
        else:
          print(f' ERR: Device with ID:{previd} not found with BAUDRATE:{baud}')
      else:
        print(' ERR:Competing.')
    else:
      print(' ERR:There is some problem.')
else:
  print(' usage: changeid [line] <baudrate> <prev id> <new id>')
Page Top
ボーレート変更 anchor.png

次のコードは任意のボーレートへ変更するものだが、少しでも疑義が生じたらボーレートの変更を行わないようにしている。

#!/usr/bin/python3
import os, sys
from pyPMX import PMXProtocol

baudlist = { 57600 : 0, 115200 : 1, 625000 : 2, 1000000 : 3, 1250000 : 4, 1500000 : 5, 2000000 : 6, 3000000 : 7 }

if len(sys.argv) == 4:
  arg = sys.argv[1:]
  dev = '\\\\.\\COM12'
elif len(sys.argv) == 5:
  arg = sys.argv[2:]
  dev = sys.argv[1]
else:
  arg = []
  dev = ''

if not os.path.exists(dev):
  dev = ''

if dev != '':
  try:
    prevbaud = int(arg[0])
    prevind = baudlist[prevbaud]
    id = int(arg[1])
    newbaud = int(arg[2])
    newind = baudlist[newbaud]
    pmx = PMXProtocol(dev, newbaud)
  except:
    print(' ERR: There is some problem.')
  else:
    if pmx.MemREAD8(id, 0) == None:
      pmx.baudrate = prevbaud
      if pmx.MotorWRITE(id, pmx.MOTW_OPT_FREE, ()) != None:
        s = (id, newind, pmx.SYSW_PARITY_NONE, 20)
        if pmx.SystemWRITE(id, s):
          print(' OK')
        else:
          print(' NG')
      else:
        print(f' ERR: Device with ID:{id} not found with BAUDRATE:{prevbaud}')
    else:
      print(' ERR: Competing.')
else:
  print(' usage: changebaud [line] <baudrate> <id> <new baudrate>')
Page Top

MicroPython anchor.png

PicoSHIELD+Raspberry Pi Picoを対象にPythonのコードを作ってみた。

# pyPMX.py for MicroPython(RP2040)
# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: (C) 2024 mukyokyo

from machine import UART, Pin
import time, array
from collections import namedtuple
from struct import pack, unpack

##########################################################
# Functionalized the part of converting int to bytes.
# If specified as a tuple, it is converted to bytes at once.
##########################################################
@micropython.native
def B2Bs(d) -> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d)
  else:
    return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d))

@micropython.native
def W2Bs(d) -> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]])
  else:
    return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d))

@micropython.native
def L2Bs(d) -> bytes:
  if isinstance(d, list) or isinstance(d, tuple):
    return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]])
  else:
    return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d))

##########################################################
# API for Kondo PMX
##########################################################
class PMXProtocol:
  BROADCASTING_ID  = const(0xff)
  CMD_MemREAD      = const(0xa0)
  CMD_MemWRITE     = const(0xa1)
  CMD_LOAD         = const(0xa2)
  CMD_SAVE         = const(0xa3)
  CMD_MotorREAD    = const(0xa4)
  CMD_MotorWRITE   = const(0xa5)
  CMD_SystemREAD   = const(0xbb)
  CMD_SystemWRITE  = const(0xbc)
  CMD_ReBoot       = const(0xbd)
  CMD_FactoryReset = const(0xbe)
  CMD_SystemINIT   = const(0xbf)

  (SYSW_BAUD_57600, SYSW_BAUD_115_2k, SYSW_BAUD_625k, SYSW_BAUD_1M, SYSW_BAUD_1_25M, SYSW_BAUD_1_5M, SYSW_BAUD_2M, SYSW_BAUD_3M) = range(8)
  (SYSW_PARITY_NONE, SYSW_PARITY_ODD, SYSW_PARITY_EVEN) = range(3)
  (MOTW_OPT_NONE, MOTW_OPT_TORQUEON, MOTW_OPT_FREE, _, MOTW_OPT_BRAKE, _, _, _, MOTW_OPT_HOLD) = range(9)

  __crc16_lutable = array.array('H',(
    0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
    0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de,
    0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d,
    0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc,
    0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b,
    0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a,
    0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49,
    0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78,
    0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067,
    0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256,
    0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405,
    0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634,
    0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3,
    0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92,
    0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x01ce0,0xcc1,
    0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0
  ))

  def __init__(self, baudrate = 115200, timeout_ms = 50):
    """
    Initalize

    parameters
    -------------
    baudrate : int
      Serial baudrate[bps]
    timeout_ms : float
      Read timeout[s]
    """
    self.__Error = 0
    self.__baudrate = baudrate
    self.__timeout = timeout_ms
    self.__status = 0
    self.__serial = UART(1, baudrate, tx = Pin(4), rx = Pin(5), txbuf = 500, rxbuf = 500, timeout = timeout_ms)

  @property
  def baudrate(self):
    return self.__baudrate

  @baudrate.setter
  def baudrate(self, baudrate):
    self.__baudrate = baudrate
    self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout)

  @property
  def timeout(self):
    return self.__timeout

  @timeout.setter
  def timeout(self, timeout_ms):
    self.__timeout = timeout_ms
    self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout)

  @property
  def status(self):
    return self.__status

  @micropython.viper
  def __crc16(self, dat : ptr8, l:int) -> int:
    crc:int = 0
    pt = ptr16(self.__crc16_lutable)
    for i in range(l):
      crc = (crc << 8) ^ pt[((crc >> 8) ^ dat[i]) & 0xff]
    return crc & 0xffff

  @micropython.native
  def TxPacket(self, id : int, cmd : int, opt : int, param : bytes, echo = False) -> (bytes, bool):
    if ((id >= 0 and id <= 239) or id == BROADCASTING_ID) and len(param) <= (256 - 8):
      txp = bytes([0xfe,0xfe,id,len(param) + 8,cmd,opt]) + bytes(param)
      txp += W2Bs(self.__crc16(txp, len(txp)))
      _ = self.__serial.read(self.__serial.any())
      if echo: print('TX:', txp.hex(':'))
      self.__serial.write(txp)
      return txp, True
    return None, False

  @micropython.native
  def __rx(self, length) -> bytes:
    s = self.__serial.read(length)
    if s:
      l = len(s)
      if l == length:
        return s
      else:
        r = s
        length -= l
        while self.__serial.any() > 0:
          s = self.__serial.read(length)
          r += s
          length -= len(s)
          if length == 0:
            break
        return r
    else:
      return bytes()

  @micropython.native
  def RxPacket(self, echo = False) -> (bytes, bool):
    self.__serial.flush()
    rxp = self.__rx(6)
    if rxp:
      if len(rxp) == 6:
        l = rxp[3] - 6
        rxp += self.__rx(l)
        if len(rxp) == l + 6 and rxp[0] == 0xfe and rxp[1] == 0xfe and unpack('<H', rxp[-2:])[0] == self.__crc16(rxp[:-2], len(rxp[:-2])):
          self.__status = rxp[5]
          if echo: print('RX:', rxp.hex(':'))
          return bytes(rxp), True
    return None, False

  @micropython.native
  def MemWRITE(self, id : int, addr : int, data : bytes, echo = False) -> bool:
    if ((id >= 0 and id <= 239) or id == BROADCASTING_ID) and addr >= 0 and addr <= 0x4ff:
      if self.TxPacket(id, CMD_MemWRITE, 1, W2Bs(addr) + data, echo)[1]:
        if id != BROADCASTING_ID:
          d, r = self.RxPacket(echo)
          if r:
            return (d[2] == id) and (d[4] == 0x21)
        else:
          return True
    return False

  @micropython.native
  def MemWRITE8(self, id : int, addr : int, data : int | list | tuple, echo = False) -> bool:
    return self.MemWRITE(id, addr, B2Bs(data), echo)

  @micropython.native
  def MemWRITE16(self, id : int, addr : int, data : int | list | tuple, echo = False) -> bool:
    return self.MemWRITE(id, addr, W2Bs(data), echo)

  @micropython.native
  def MemWRITE32(self, id : int, addr : int, data : int | list | tuple, echo = False) -> bool:
    return self.MemWRITE(id, addr, L2Bs(data), echo)

  @micropython.native
  def MemREAD(self, id : int, addr : int, length : int, echo = False) -> bytes:
    if addr >= 0 and id <= 239 and addr >= 0 and addr <= 0x4ff and length > 0 and length > 0 and length <= 247:
      if self.TxPacket(id, CMD_MemREAD, 0, W2Bs(addr) + B2Bs(length), echo)[1]:
        d, r = self.RxPacket(echo)
        if r:
          if d[4] == 0x20 and id == d[2] and (d[5] & (4 + 8 + 0x10 + 0x40)) == 0:
            return bytes(d[6:-2])
    return None

  @micropython.native
  def MemREAD8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.MemREAD(id, addr, length, echo)
    if r != None:
      n = sum(iter_unpack('b' if signed else 'B', r), ())
      return n if length > 1 else n[0]
    return None

  @micropython.native
  def MemREAD16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.MemREAD(id, addr, 2 * length, echo)
    if r != None:
      n = sum(iter_unpack('h' if signed else 'H', r), ())
      return n if length > 1 else n[0]
    return None

  @micropython.native
  def MemREAD32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int:
    r = self.MemREAD(id, addr, 4 * length, echo)
    if r != None:
      n = sum(iter_unpack('i' if signed else 'I', r), ())
      return n if length > 1 else n[0]
    return None

  @micropython.native
  def LOAD(self, id : int, echo = False) -> bool:
    if self.TxPacket(id, CMD_LOAD, 0, (), echo)[1]:
      if id != BROADCASTING_ID:
        d, r = self.RxPacket(echo)
        if r:
          return id == d[2] and d[4] == 0x22
      else:
        return True;
    return False

  @micropython.native
  def SAVE(self, id : int, echo = False) -> bool:
    if self.TxPacket(id, CMD_SAVE, 0, (), echo)[1]:
      if id != BROADCASTING_ID:
        d, r = self.RxPacket(echo)
        if r:
          return id == d[2] and d[4] == 0x23
      else:
        return True;
    return False

  @micropython.native
  def MotorREAD(self, id : int, echo = False) -> tuple:
    if self.TxPacket(id, CMD_MotorREAD, 0, (), echo):
      if id != BROADCASTING_ID:
        dat, r = self.RxPacket(echo)
        if r:
          return tuple([dat[6], tuple(n[0] for n in iter_unpack('<h', dat[7:-2])) if len(dat[7:-2]) > 0 and dat[4] == 0x24 else ()])
        else:
          return None
      else:
        return ()
    return None

  @micropython.native
  def MotorWRITE(self, id : int, opt : int, dat : (), echo = False) -> tuple:
    if self.TxPacket(id, CMD_MotorWRITE, opt, W2Bs(dat), echo):
      if id != BROADCASTING_ID:
        dat, r = self.RxPacket(echo)
        if r:
          return tuple([dat[6], unpack('<'+'h'*(len(dat[7:-2])>>1), dat[7:-2]) if len(dat[7:-2]) > 0 and dat[4] == 0x25 else ()])
        else:
          return None
      else:
        return ()
    return None

  @micropython.native
  def SystemREAD(self, id : int, echo = False) -> tuple:
    if self.TxPacket(id, CMD_SystemREAD, 0, (), echo)[1]:
      d, r = self.RxPacket(echo)
      if r:
        if len(d[6:-2]) == 13 and d[4] == 0x3b:
          return unpack('<IIIB', d[6:-2])
    return None

  @micropython.native
  def SystemWRITE(self, id : int, data : (), echo = False) -> bool:
    if data[0] >= 0 and data[0] <= 239 and data[1] >= 0 and data[1] <= 7 and data[2] >= 0 and data[2] <= 2 and data[3] >= 1 and data[3] <= 255:
      d = self.SystemREAD(id, echo)
      if d:
        with self.__mutex:
          if self.TxPacket(id, CMD_SystemWRITE, 0xf, L2Bs(d[0]) + B2Bs(data), echo)[1]:
            d, r = self.RxPacket(echo)
            if r:
              return id == d[2] and d[4] == 0x3c and (d[5] & (4 + 8 + 0x10 + 0x40)) == 0
    return False

  @micropython.native
  def ReBoot(self, id : int, echo = False) -> bool:
    if id != BROADCASTING_ID:
      if self.TxPacket(id, CMD_ReBoot, 0, W2Bs(0), echo)[1]:
        d, r = self.RxPacket(echo)
        if r:
          return d[2] == id and d[4] == 0x3d
    return False

  @micropython.native
  def FactoryReset(self, id : int, echo = False) -> bool:
    if id != BROADCASTING_ID:
      d = self.SystemREAD(id, echo)
      if d != None:
        if self.TxPacket(id, CMD_FactoryReset, 0, L2Bs(d[0]), echo)[1]:
          d, r = self.RxPacket(echo)
          if r:
            return d[2] == id and d[4] == 0x3e
    return False

  @micropython.native
  def FullScan(self) -> tuple:
    orgtimeout = self.__timeout
    orgbaudrate = self.__baudrate
    #baud=(3000000,2000000,1500000,1250000,1000000,625000,115200,57600)
    baud=(57600,115200,625000,1000000,1250000,1500000,2000000,3000000)
    self.__serial.timeout = 0.001
    for b in baud:
      for id in range(240):
        self.__serial.baudrate = b
        r = self.MemREAD8(id, 0)
        print(f'baud:{b:7} id:{id:3} :', end='find\n' if r else 'none\r')
    self.__serial.timeout = orgtimeout
    self.__baudrate = orgbaudrate

##########################################################
# test code
##########################################################
if __name__ == "__main__":
  import time
  from pyPMX import *

  machine.freq(250000000) # set the CPU frequency to 250 MHz

  try:
    pmx = PMXProtocol()
  except:
    pass
  else:
    ID = 0

    # reboot
    print('ReBoot=', pmx.ReBoot(ID, echo=True))
    time.sleep(0.5)
    '''
    # factory reset
    print('FactoryReset=',pmx.FactoryReset(ID))
    time.sleep(0.5)
    '''
    # load
    print('LOAD=', pmx.LOAD(ID, echo=True))
    # save
    print('SAVE=', pmx.SAVE(ID, echo=True))
    '''
    # sys write
    print('SysWrite=',pmx.SystemWRITE(ID, (ID, pmx.SYSW_BAUD_115_2k, pmx.SYSW_PARITY_NONE, 20)))
    time.sleep(0.5)
    '''
    # sys read
    r = pmx.SystemREAD(ID, echo=True)
    print('SysRead=', f'${r[0]:08x} ${r[1]:08x} ${r[2]:08x} {r[3]:d} ' if r else 'False')
    # dump memory
    print('dump')
    for addr in range(0, 800, 20):
      r = pmx.MemREAD(ID, addr, 20)
      print(f' MemREAD({addr}:{20})=', r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!')
    # gain
    r = pmx.MemREAD(ID, 0, 64)
    print('gain\n MemREAD(0:64)=', unpack('<IIIIIIIIIIIIIIII', r) if r else '!!!!!!!!')
    # voltage limit
    r = pmx.MemREAD(ID, 76, 8)
    print('volt lim\n MemREAD(76:8)=', unpack('<HHHH', r) if r else '!!!!!!!!')
    # present value
    r = pmx.MemREAD(ID, 300, 24)
    print('present val\n MemREAD(300:20)=', unpack('<hhhhhhhhhHHH', r) if r else '!!!!!!!!')
    # error stat
    r = pmx.MemREAD(ID, 400, 6)
    print('err stat\n MemREAD(0:400:7)=', unpack('<BBBxH', r) if r else '!!!!!!!!')

    print('preparation for pos ctrl')
    # ctrl off

    print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ()))
    time.sleep(0.1)
    # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time
    print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 1))
    print(' MemWRITE8(502)=', pmx.MemWRITE8(ID, 502, 0b11111))
    time.sleep(0.1)

    # goal angle
    print('start pos ctrl')
    print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ()))
    for ang in tuple(range(0, 32000, 500)) + tuple(range(32000, -32000, -500)) + tuple(range(-32000, 500, 500)):
      for n in range(10):
        r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (ang,))
        if r != None:
          if len(r) > 0:
            print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r')
            pmx.MemREAD(ID, 400, 6)
        else:
          pass
          print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!')
          break
        time.sleep(0.005)
      else:
        continue
      pass
      break
    else:
      print()
    time.sleep(0.1)
    print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ()))



    print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ()))
    time.sleep(0.1)
    # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time
    print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 2))
    time.sleep(0.1)

    # goal angle
    print('start speed ctrl')
    print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ()))
    for spd in tuple(range(0, 2000, 50)) + tuple(range(2000, -2000, -50)) + tuple(range(-2000, 50, 50)):
      for n in range(10):
        r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (spd,))
        if r != None:
          if len(r) > 0:
            print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r')
            pmx.MemREAD(ID, 400, 6)
        else:
          pass
          print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!')
          break
        time.sleep(0.005)
      else:
        continue
      pass
      break
    else:
      print()
    time.sleep(0.1)
    print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ()))

    del pmx

  print('fin')

Front page   Diff ReloadPrint View   Page list Search Recent changes   RSS of recent changes (RSS 1.0) RSS of recent changes (RSS 2.0) RSS of recent changes (RSS Atom)
Last-modified: 2024-04-13 (Sat) 22:53:38 (JST) (68d)