Thursday, February 9, 2012

ATA Commands in Python

In software development sometimes you spend time on an implementation which you are unreasonably proud of, but ultimately decide not to use in the product. This is one such story.

I needed to retrieve information from an attached disk, such as its model and serial number. There are commands which can do this, like hdparm, sdparm, and smartctl, but initially I tried to avoid building in a dependency on any such tools by interrogating it from the hard drive directly. In pure Python.

Snicker all you want, it did work. My first implementation used an older Linux API to retrieve this information, the HDIO_GET_IDENTITY ioctl. This ioctl maps more or less directly to an ATA IDENTIFY or SCSI INQUIRY from the drive. The implementation uses the Python struct module to define the data structure sent along with the ioctl.

def GetDriveId(dev):
  """Return information from interrogating the drive.

  This routine issues a HDIO_GET_IDENTITY ioctl to a block device,
  which only root can do.

  Args:
    dev: name of the device, such as 'sda' or '/dev/sda'

  Returns:
    (serial_number, fw_version, model) as strings
  """
  # from /usr/include/linux/hdreg.h, struct hd_driveid
  # 10H = misc stuff, mostly deprecated
  # 20s = serial_no
  # 3H  = misc stuff
  # 8s  = fw_rev
  # 40s = model
  # ... plus a bunch more stuff we don't care about.
  struct_hd_driveid = '@ 10H 20s 3H 8s 40s'
  HDIO_GET_IDENTITY = 0x030d
  if dev[0] != '/':
    dev = '/dev/' + dev
  with open(dev, 'r') as fd:
    buf = fcntl.ioctl(fd, HDIO_GET_IDENTITY, ' ' * 512)
    fields = struct.unpack_from(struct_hd_driveid, buf)
    serial_no = fields[10].strip()
    fw_rev = fields[14].strip()
    model = fields[15].strip()
    return (serial_no, fw_rev, model)

No no wait, stop snickering, it does work! It has to run as root, which is one reason why I eventually abandoned this approach.

$ sudo python hdio.py
('5RY0N6BD', '3.ADA', 'ST3250310AS')

HDIO_GET_IDENTITY is deprecated in Linux 2.6, and logs a message saying that sg_io should be used instead. sg_io is an API to send SCSI commands to a device. sg_io also didn't require my entire Python process to run as root, I'd "only" have to give it CAP_SYS_RAWIO. So of course I changed the implementation... still in Python. Stop snickering.


class AtaCmd(ctypes.Structure):
  """ATA Command Pass-Through
     http://www.t10.org/ftp/t10/document.04/04-262r8.pdf"""

  _fields_ = [
      ('opcode', ctypes.c_ubyte),
      ('protocol', ctypes.c_ubyte),
      ('flags', ctypes.c_ubyte),
      ('features', ctypes.c_ubyte),
      ('sector_count', ctypes.c_ubyte),
      ('lba_low', ctypes.c_ubyte),
      ('lba_mid', ctypes.c_ubyte),
      ('lba_high', ctypes.c_ubyte),
      ('device', ctypes.c_ubyte),
      ('command', ctypes.c_ubyte),
      ('reserved', ctypes.c_ubyte),
      ('control', ctypes.c_ubyte) ]


class SgioHdr(ctypes.Structure):
  """<scsi/sg.h> sg_io_hdr_t."""

  _fields_ = [
      ('interface_id', ctypes.c_int),
      ('dxfer_direction', ctypes.c_int),
      ('cmd_len', ctypes.c_ubyte),
      ('mx_sb_len', ctypes.c_ubyte),
      ('iovec_count', ctypes.c_ushort),
      ('dxfer_len', ctypes.c_uint),
      ('dxferp', ctypes.c_void_p),
      ('cmdp', ctypes.c_void_p),
      ('sbp', ctypes.c_void_p),
      ('timeout', ctypes.c_uint),
      ('flags', ctypes.c_uint),
      ('pack_id', ctypes.c_int),
      ('usr_ptr', ctypes.c_void_p),
      ('status', ctypes.c_ubyte),
      ('masked_status', ctypes.c_ubyte),
      ('msg_status', ctypes.c_ubyte),
      ('sb_len_wr', ctypes.c_ubyte),
      ('host_status', ctypes.c_ushort),
      ('driver_status', ctypes.c_ushort),
      ('resid', ctypes.c_int),
      ('duration', ctypes.c_uint),
      ('info', ctypes.c_uint)]

def SwapString(str):
  """Swap 16 bit words within a string.

  String data from an ATA IDENTIFY appears byteswapped, even on little-endian
  achitectures. I don't know why. Other disk utilities I've looked at also
  byte-swap strings, and contain comments that this needs to be done on all
  platforms not just big-endian ones. So... yeah.
  """
  s = []
  for x in range(0, len(str) - 1, 2):
    s.append(str[x+1])
    s.append(str[x])
  return ''.join(s).strip()

def GetDriveIdSgIo(dev):
  """Return information from interrogating the drive.

  This routine issues a SG_IO ioctl to a block device, which
  requires either root privileges or the CAP_SYS_RAWIO capability.

  Args:
    dev: name of the device, such as 'sda' or '/dev/sda'

  Returns:
    (serial_number, fw_version, model) as strings
  """

  if dev[0] != '/':
    dev = '/dev/' + dev
  ata_cmd = AtaCmd(opcode=0xa1,  # ATA PASS-THROUGH (12)
                   protocol=4<<1,  # PIO Data-In
                   # flags field
                   # OFF_LINE = 0 (0 seconds offline)
                   # CK_COND = 1 (copy sense data in response)
                   # T_DIR = 1 (transfer from the ATA device)
                   # BYT_BLOK = 1 (length is in blocks, not bytes)
                   # T_LENGTH = 2 (transfer length in the SECTOR_COUNT field)
                   flags=0x2e,
                   features=0, sector_count=0,
                   lba_low=0, lba_mid=0, lba_high=0,
                   device=0,
                   command=0xec,  # IDENTIFY
                   reserved=0, control=0)
  ASCII_S = 83
  SG_DXFER_FROM_DEV = -3
  sense = ctypes.c_buffer(64)
  identify = ctypes.c_buffer(512)
  sgio = SgioHdr(interface_id=ASCII_S, dxfer_direction=SG_DXFER_FROM_DEV,
                 cmd_len=ctypes.sizeof(ata_cmd),
                 mx_sb_len=ctypes.sizeof(sense), iovec_count=0,
                 dxfer_len=ctypes.sizeof(identify),
                 dxferp=ctypes.cast(identify, ctypes.c_void_p),
                 cmdp=ctypes.addressof(ata_cmd),
                 sbp=ctypes.cast(sense, ctypes.c_void_p), timeout=3000,
                 flags=0, pack_id=0, usr_ptr=None, status=0, masked_status=0,
                 msg_status=0, sb_len_wr=0, host_status=0, driver_status=0,
                 resid=0, duration=0, info=0)
  SG_IO = 0x2285  # <scsi/sg.h>
  with open(dev, 'r') as fd:
    if fcntl.ioctl(fd, SG_IO, ctypes.addressof(sgio)) != 0:
      print "fcntl failed"
      return None
    if ord(sense[0]) != 0x72 or ord(sense[8]) != 0x9 or ord(sense[9]) != 0xc:
      return None
    # IDENTIFY format as defined on pg 91 of
    # http://t13.org/Documents/UploadedDocuments/docs2006/D1699r3f-ATA8-ACS.pdf
    serial_no = SwapString(identify[20:40])
    fw_rev = SwapString(identify[46:53])
    model = SwapString(identify[54:93])
    return (serial_no, fw_rev, model)

For the unbelievers out there, this one works too.

$ sudo python sgio.py
('5RY0N6BD', '3.ADA', 'ST3250310AS')

So, there you go. HDIO_GET_IDENTITY and SG_IO implemented in pure Python. They do work, but in the process of working on this and reading existing code it became clear that low level ATA handling is fraught with peril. The disk industry has been iterating this interface for decades, and there is a ton of gear out there that made questionable choices in how to interpret the spec. Most of the code in existing utilities is not to implement the base operations but instead to handle the quirks from various manufacturers. I decided that I didn't want to go down that path, and will instead rely on forking sdparm and smartctl as needed.

I'll just leave this post here for search engines to find. I'm sure there is a ton of demand for this information.

Stop snickering.