from bluepy.btle import *
import logging
import binascii
import time
[docs]class SimpleBleClient(object):
"""This is a class implementation of a simple BLE client.
:param iface: The Bluetooth interface on which to make the connection. On Linux, 0 means `/dev/hci0`, 1 means `/dev/hci1` and so on., defaults to 0
:type iface: int, optional
:param scanCallback: A function handle of the form ``callback(client, device, isNewDevice, isNewData)``, where ``client`` is a handle to the :class:`simpleble.SimpleBleClient` that invoked the callback and ``device`` is the detected :class:`simpleble.SimpleBleDevice` object. ``isNewDev`` is `True` if the device (as identified by its MAC address) has not been seen before by the scanner, and `False` otherwise. ``isNewData`` is `True` if new or updated advertising data is available, defaults to None
:type scanCallback: function, optional
:param notificationCallback: A function handle of the form ``callback(client, characteristic, data)``, where ``client`` is a handle to the :class:`simpleble.SimpleBleClient` that invoked the callback, ``characteristic`` is the notified :class:`bluepy.blte.Characteristic` object and data is a `bytearray` containing the updated value. Defaults to None
:type notificationCallback: function, optional
"""
def __init__(self, iface=0, scanCallback=None, notificationCallback=None):
"""Constructor method
"""
self._scanner = Scanner(iface) if(scanCallback is None)\
else Scanner().withDelegate(SimpleBleScanDelegate(scanCallback, self))
self._iface = iface
self._discoveredDevices = []
self._characteristics = []
self._connected = False
self._connectedDevice = None
self._notificationCallback = None
[docs] def setScanCallback(self, callback):
"""Set the callback function to be executed when a device is detected by the client.
:param callback: A function handle of the form ``callback(client, device, isNewDevice, isNewData)``, where ``client`` is a handle to the :class:`simpleble.SimpleBleClient` that invoked the callback and ``device`` is the detected :class:`simpleble.SimpleBleDevice` object. ``isNewDev`` is `True` if the device (as identified by its MAC address) has not been seen before by the scanner, and `False` otherwise. ``isNewData`` is `True` if new or updated advertising data is available.
:type callback: function
"""
self._scanner.withDelegate(
SimpleBleScanDelegate(callback, client=self))
[docs] def setNotificationCallback(self, callback):
"""Set the callback function to be executed when a device sends a notification to the client.
:param callback: A function handle of the form ``callback(client, characteristic, data)``, where ``client`` is a handle to the :class:`simpleble.SimpleBleClient` that invoked the callback, ``characteristic`` is the notified :class:`bluepy.blte.Characteristic` object and data is a `bytearray` containing the updated value. Defaults to None
:type callback: function, optional
"""
if(self._connectedDevice is not None):
self._connectedDevice.setNotificationCallback(callback)
self._notificationCallback = callback
[docs] def scan(self, timeout=10.0):
"""Scans for and returns detected nearby devices
:param timeout: Specify how long (in seconds) the scan should last, defaults to 10.0
:type timeout: float, optional
:return: List of :class:`simpleble.SimpleBleDevice` objects
:rtype: list
"""
self._discoveredDevices = []
scanEntries = self._scanner.scan(timeout)
for scanEntry in scanEntries:
self._discoveredDevices.append(
SimpleBleDevice(
client=self,
addr=scanEntry.addr,
iface=scanEntry.iface,
data=scanEntry.getScanData(),
rssi=scanEntry.rssi,
connectable=scanEntry.connectable, updateCount=scanEntry.updateCount
)
)
return self._discoveredDevices
[docs] def connect(self, device):
"""Attempts to connect client to a given :class:`simpleble.SimpleBleDevice` object and returns a bool indication of the result.
:param device: An instance of the device to which we want to connect. Normally acquired by calling :meth:`simpleble.SimpleBleClient.scan` or :meth:`simpleble.SimpleBleClient.searchDevice`
:type device: SimpleBleDevice
:return: `True` if connection was successful, `False` otherwise
:rtype: bool
"""
self._connected = device.connect()
if(self._connected):
self._connectedDevice = device
if(self._notificationCallback is not None):
self._connectedDevice.setNotificationCallback(
self._notificationCallback)
return self._connected
[docs] def disconnect(self):
"""Drops existing connection.
Note that the current version of the project assumes that the client can be connected to at most one device at a time.
"""
self._connectedDevice.disconnect()
try:
self._scanner.stop()
except:
pass
self._connectedDevice = None
self._connected = False
[docs] def isConnected(self):
"""Check to see if client is connected to a device
:return: `True` if connected, `False` otherwise
:rtype: bool
"""
return self._connected
[docs] def getCharacteristics(self, startHnd=1, endHnd=0xFFFF, uuids=None):
"""Returns a list containing :class:`bluepy.btle.Characteristic` objects for the peripheral. If no arguments are given, will return all characteristics. If startHnd and/or endHnd are given, the list is restricted to characteristics whose handles are within the given range.
:param startHnd: Start index, defaults to 1
:type startHnd: int, optional
:param endHnd: End index, defaults to 0xFFFF
:type endHnd: int, optional
:param uuids: a list of UUID strings, defaults to None
:type uuids: list, optional
:return: List of returned :class:`bluepy.btle.Characteristic` objects
:rtype: list
"""
self._characteristics = self._connectedDevice.getCharacteristics(
startHnd, endHnd, uuids
)
return self._characteristics
[docs] def readCharacteristic(self, characteristic=None, uuid=None):
"""Reads the current value of the characteristic identified by either a :class:`bluepy.btle.Characteristic` object ``characteristic``, or a UUID string ``uuid``. If both are provided, then the characteristic will be read on the basis of the ``characteristic`` object. A :class:`bluepy.btle.BTLEException.GATT_ERROR` is raised if no inputs are specified or the requested characteristic was not found.
:param characteristic: A :class:`bluepy.btle.Characteristic` object, defaults to None
:type characteristic: :class:`bluepy.btle.Characteristic`, optional
:param uuid: A given UUID string, defaults to None
:type uuid: string, optional
:raises: :class:`bluepy.btle.BTLEException.GATT_ERROR`: If no inputs are specified or the requested characteristic was not found.
:return: The value read from the characteristic
:rtype: bytearray
"""
if(characteristic is None and uuid is not None):
characteristic = self._connectedDevice.getCharacteristic(
uuids=[uuid])[0]
if(characteristic is None):
raise BTLEException(
BTLEException.GATT_ERROR, "Characteristic was either not found, given the UUID, or not specified")
return self._connectedDevice.readCharacteristic(
characteristic.getHandle())
[docs] def writeCharacteristic(self, val, characteristic=None,
uuid=None, withResponse=False):
"""Writes the data val (of type str on Python 2.x, byte on 3.x) to the characteristic identified by either a :class:`bluepy.btle.Characteristic` object ``characteristic``, or a UUID string ``uuid``. If both are provided, then the characteristic will be read on the basis of the ``characteristic`` object. A :class:`bluepy.btle.BTLEException.GATT_ERROR` is raised if no inputs are specified or the requested characteristic was not found. If ``withResponse`` is `True`, the client will await confirmation that the write was successful from the device.
:param val: Value to be written in characteristic
:type val: str on Python 2.x, byte on 3.x
:param characteristic: A :class:`bluepy.btle.Characteristic` object, defaults to None
:type characteristic: :class:`bluepy.btle.Characteristic`, optional
:param uuid: A given UUID string, defaults to None
:type uuid: string, optional
:param withResponse: If ``withResponse`` is `True`, the client will await confirmation that the write was successful from the device, defaults to False
:type withResponse: bool, optional
:raises: :class:`bluepy.btle.BTLEException.GATT_ERROR`: If no inputs are specified or the requested characteristic was not found.
:return: `True` or `False` indicating success or failure of write operation, in the case that ``withResponce`` is `True`
:rtype: bool
"""
if(characteristic is None and uuid is not None):
characteristic = device.getCharacteristic(uuids=[uuid])
if(characteristic is None):
raise BTLEException(
BTLEException.GATT_ERROR, "Characteristic was either not found, given the UUID, or not specified")
return self._connectedDevice.writeCharacteristic(characteristic.getHandle(), val, withResponse)
[docs] def searchDevice(self, name=None, mac=None, timeout=10.0):
"""Searches for and returns, given it exists, a :class:`simpleble.SimpleBleDevice` device objects, based on the provided ``name`` and/or ``mac`` address. If both a ``name`` and a ``mac`` are provided, then the client will only return a device that matches both conditions.
:par am name: The "Complete Local Name" Generic Access Attribute (GATT) of the device, defaults to None
:type name: str, optional
:param mac: The MAC address of the device, defaults to None
:type mac: str, optional
:param timeout: Specify how long (in seconds) the scan should last, defaults to 10.0. Internally, it serves as an input to the invoked :meth:`simpleble.SimpleBleClient.scan` method.
:type timeout: float, optional
:raises AssertionError: If neither a ``name`` nor a ``mac`` inputs have been provided
:return: A :class:`simpleble.SimpleBleDevice` object if search was succesfull, None otherwise
:rtype: :class:`simpleble.SimpleBleDevice` | None
"""
try:
check = not (name is None)
chekc = not (mac is None)
assert check or chekc
except AssertionError as e:
print("Either a name or a mac address must be provided to find a device!")
raise e
mode = 0
if(name is not None):
mode += 1
if(mac is not None):
mode += 1
# Perform initial detection attempt
self.scan(timeout)
for device in self._discoveredDevices:
found = 0
if (device.addr == mac):
found += 1
for (adtype, desc, value) in device.data:
if (adtype == 9 and value == name):
found += 1
if(found >= mode):
return device
return None
[docs] def printFoundDevices(self):
"""Print all devices discovered during the last scan. Should only be called after a :meth:`simpleble.SimpleBleClient.scan` has been called first.
"""
for device in self._discoveredDevices:
print("Device %s (%s), RSSI=%d dB" %
(device.addr, device.addrType, device.rssi))
for (adtype, desc, value) in device.data:
print(" %s = %s" % (desc, value))
[docs]class SimpleBleScanDelegate(DefaultDelegate):
def __init__(self, callback, client=None):
super().__init__()
self.callback = callback
self.client = client
[docs] def handleDiscovery(self, scanEntry, isNewDevice, isNewData):
device = SimpleBleDevice(
client=self.client,
addr=scanEntry.addr,
iface=scanEntry.iface,
data=scanEntry.getScanData(),
rssi=scanEntry.rssi,
connectable=scanEntry.connectable, updateCount=scanEntry.updateCount
)
self.callback(self.client, device, isNewDevice, isNewData)
[docs]class SimpleBleNotificationDelegate(DefaultDelegate):
def __init__(self, callback, client):
super().__init__()
self.callback = callback
self.client = client
[docs] def handleNotification(self, characteristic, data):
self.callback(self.client, characteristic, data)
[docs]class SimpleBleDevice(Peripheral):
"""This is a conceptual class representation of a simple BLE device (GATT Server). It is essentially an extended combination of the :class:`bluepy.btle.Peripheral` and :class:`bluepy.btle.ScanEntry` classes
:param client: A handle to the :class:`simpleble.SimpleBleClient` client object that detected the device
:type client: class:`simpleble.SimpleBleClient`
:param addr: Device MAC address, defaults to None
:type addr: str, optional
:param addrType: Device address type - one of ADDR_TYPE_PUBLIC or ADDR_TYPE_RANDOM, defaults to ADDR_TYPE_PUBLIC
:type addrType: str, optional
:param iface: Bluetooth interface number (0 = /dev/hci0) used for the connection, defaults to 0
:type iface: int, optional
:param data: A list of tuples (adtype, description, value) containing the AD type code, human-readable description and value for all available advertising data items, defaults to None
:type data: list, optional
:param rssi: Received Signal Strength Indication for the last received broadcast from the device. This is an integer value measured in dB, where 0 dB is the maximum (theoretical) signal strength, and more negative numbers indicate a weaker signal, defaults to 0
:type rssi: int, optional
:param connectable: `True` if the device supports connections, and `False` otherwise (typically used for advertising ‘beacons’)., defaults to `False`
:type connectable: bool, optional
:param updateCount: Integer count of the number of advertising packets received from the device so far, defaults to 0
:type updateCount: int, optional
"""
def __init__(self, client, addr=None, addrType=ADDR_TYPE_PUBLIC, iface=0, data=None, rssi=0, connectable=False, updateCount=0):
"""Constructor method
"""
super().__init__(deviceAddr=None, addrType=addrType, iface=iface)
self.addr = addr
self.addrType = addrType
self.iface = iface
self.rssi = rssi
self.connectable = connectable
self.updateCount = updateCount
self.data = data
self._connected = False
self._services = []
self._characteristics = []
self._client = client
[docs] def getServices(self, uuids=None):
"""Returns a list of :class:`bluepy.blte.Service` objects representing the services offered by the device. This will perform Bluetooth service discovery if this has not already been done; otherwise it will return a cached list of services immediately..
:param uuids: A list of string service UUIDs to be discovered, defaults to None
:type uuids: list, optional
:return: A list of the discovered :class:`bluepy.blte.Service` objects, which match the provided ``uuids``
:rtype: list On Python 3.x, this returns a dictionary view object, not a list
"""
self._services = []
if(uuids is not None):
for uuid in uuids:
try:
service = self.getServiceByUUID(uuid)
self.services.append(service)
except BTLEException:
pass
else:
self._services = super().getServices()
return self._services
[docs] def setNotificationCallback(self, callback):
"""Set the callback function to be executed when the device sends a notification to the client.
:param callback: A function handle of the form ``callback(client, characteristic, data)``, where ``client`` is a handle to the :class:`simpleble.SimpleBleClient` that invoked the callback, ``characteristic`` is the notified :class:`bluepy.blte.Characteristic` object and data is a `bytearray` containing the updated value. Defaults to None
:type callback: function, optional
"""
self.withDelegate(
SimpleBleNotificationDelegate(
callback,
client=self._client
)
)
[docs] def getCharacteristics(self, startHnd=1, endHnd=0xFFFF, uuids=None):
"""Returns a list containing :class:`bluepy.btle.Characteristic` objects for the peripheral. If no arguments are given, will return all characteristics. If startHnd and/or endHnd are given, the list is restricted to characteristics whose handles are within the given range.
:param startHnd: Start index, defaults to 1
:type startHnd: int, optional
:param endHnd: End index, defaults to 0xFFFF
:type endHnd: int, optional
:param uuids: a list of UUID strings, defaults to None
:type uuids: list, optional
:return: List of returned :class:`bluepy.btle.Characteristic` objects
:rtype: list
"""
self._characteristics = []
if(uuids is not None):
for uuid in uuids:
try:
characteristic = super().getCharacteristics(
startHnd, endHnd, uuid)[0]
self._characteristics.append(characteristic)
except BTLEException:
pass
else:
self._characteristics = super().getCharacteristics(startHnd, endHnd)
return self._characteristics
[docs] def connect(self):
"""Attempts to initiate a connection with the device.
:return: `True` if connection was successful, `False` otherwise
:rtype: bool
"""
try:
super().connect(self.addr, addrType=self.addrType, iface=self.iface)
except BTLEException as ex:
self._connected = False
return (False, ex)
self._connected = True
return True
[docs] def disconnect(self):
"""Drops existing connection to device
"""
super().disconnect()
self._connected = False
[docs] def isConnected(self):
"""Checks to see if device is connected
:return: `True` if connected, `False` otherwise
:rtype: bool
"""
return self._connected
[docs] def printInfo(self):
"""Print info about device
"""
print("Device %s (%s), RSSI=%d dB" %
(self.addr, self.addrType, self.rssi))
for (adtype, desc, value) in self.data:
print(" %s = %s" % (desc, value))
if __name__ == "__main__":
"""This example demonstrates a simple BLE client that scans for devices,
connects to a device (GATT server) of choice and continuously reads a
characteristic on that device.
"""
# The UUID of the characteristic we want to read and the name of the device # we want to read it from
Characteristic_UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8"
Device_Name = "MyESP32"
# Define our scan and notification callback methods
def myScanCallback(client, device, isNewDevice, isNewData):
client._yes = True
print("#MAC: " + device.addr + " #isNewDevice: " +
str(isNewDevice) + " #isNewData: " + str(isNewData))
# TODO: NOTIFICATIONS ARE NOT SUPPORTED YET
# def myNotificationCallback(client, characteristic, data):
# print("Notification received!")
# print(" Characteristic UUID: " + characteristic.uuid)
# print(" Data: " + str(data))
# Instantiate a SimpleBleClient and set it's scan callback
bleClient = SimpleBleClient()
bleClient.setScanCallback(myScanCallback)
# TODO: NOTIFICATIONS ARE NOT SUPPORTED YET
# bleClient.setNotificationCallback(myNotificationCollback)
# Error handling to detect Keyboard interrupt (Ctrl+C)
# Loop to ensure we can survive connection drops
while(not bleClient.isConnected()):
try:
# Search for 2 seconds and return a device of interest if found.
# Internally this makes a call to bleClient.scan(timeout), thus
# triggering the scan callback method when nearby devices are detected
device = bleClient.searchDevice(name="MyESP32", timeout=2)
if(device is not None):
# If the device was found print out it's info
print("Found device!!")
device.printInfo()
# Proceed to connect to the device
print("Proceeding to connect....")
if(bleClient.connect(device)):
# Have a peek at the services provided by the device
services = device.getServices()
for service in services:
print("Service ["+str(service.uuid)+"]")
# Check to see if the device provides a characteristic with the
# desired UUID
counter = bleClient.getCharacteristics(
uuids=[Characteristic_UUID])[0]
if(counter):
# If it does, then we proceed to read its value every second
while(True):
# Error handling ensures that we can survive from
# potential connection drops
try:
# Read the data as bytes and convert to string
data_bytes = bleClient.readCharacteristic(
counter)
data_str = "".join(map(chr, data_bytes))
# Now print the data and wait for a second
print("Data: " + data_str)
time.sleep(1.0)
except BTLEException as e:
# If we get disconnected from the device, keep
# looping until we have reconnected
if(e.code == BTLEException.DISCONNECTED):
bleClient.disconnect()
print(
"Connection to BLE device has been lost!")
break
# while(not bleClient.isConnected()):
# bleClient.connect(device)
else:
print("Could not connect to device! Retrying in 3 sec...")
time.sleep(3.0)
else:
print("Device not found! Retrying in 3 sec...")
time.sleep(3.0)
except BTLEException as e:
# If we get disconnected from the device, keep
# looping until we have reconnected
if(e.code == BTLEException.DISCONNECTED):
bleClient.disconnect()
print(
"Connection to BLE device has been lost!")
break
except KeyboardInterrupt as e:
# Detect keyboard interrupt and close down
# bleClient gracefully
bleClient.disconnect()
raise e