Hope to have what you need

How to create a BLE service

    Python

Due to the needs of the Project, I need to build a BLE service and use Python. But since I haven't done it before, I first searched the Internet for any relevant cases. Unfortunately, there is basically no relevant content online. The network only has a large number of classic Bluetooth cases. So when I solved this problem, I shared the method.

You can from here to get a demo. Then, you need unzip the file.

Execute BLE GATT server example code.

1
./bluez-5.43/test/example_gatt_server

check the output, it should be like this:

1
2
3
4
5
6
$ ./bluez/bluez-5.43/test/example-gatt-server
Registering GATT application...
GetManagedObjects
GATT application registered
Battery Level drained: 98
Battery Level drained: 96

Now, the GATT server is running. Since the example code implements “Fake Battery service that emulates a draining battery”, it outputs “Battery Level drained” message on the console every 5 seconds.

Next I will tell you how to use the demo. We just need example_gatt_server, example_gatt_client and example_advertisement, you can copy them to your project fold. You need to change their name to add .py suffix.

Then we need to create our own BLE server file, e.g: my_ble_server.py.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import sys
sys.path.append('.')
import dbus, dbus.mainloop.glib
from gi.repository import GLib
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject

from ai.bt.example_advertisement import Advertisement
from ai.bt.example_advertisement import register_ad_cb, register_ad_error_cb
from ai.bt.example_gatt_server import Service, Characteristic
from ai.bt.example_gatt_server import register_app_cb, register_app_error_cb

BLUEZ_SERVICE_NAME = 'org.bluez'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
UART_SERVICE_UUID = '6e400001-b5a3-f393-00a9-e50e24dcca9e'
UART_TX_CHARACTERISTIC_UUID = '6e400003-b5a3-f393-00a9-e50e24dcca9e'
UART_SE_CHARACTERISTIC_UUID = '6e400006-b5a3-f393-00a9-e50e24dcca9e'
mainloop = None


class DemoCharacteristic(Characteristic):

def __init__(self, bus, index, service):
Characteristic.__init__(self, bus, index, UART_TX_CHARACTERISTIC_UUID,
['read', 'write', 'notify'], service)
self.logger = ai.coloredlogging.get_logger("Play",
ai.coloredlogging.WHITE)
self.logger.addHandler(logging.NullHandler())

self.notifying = False
#GLib.io_add_watch(sys.stdin, GLib.IO_IN, self.notify_voic_command)

def WriteValue(self, value, options):
"""
Let client to write data to here.
Receive notify type data to change the notify type.
Receive command to enable and disable the control mode.
Receive action command to control MarsCat.
"""
s = bytes(value).decode()
print(s)

def ReadValue(self, options):
"""
Let client to read data from here.
"""
return [dbus.Byte(1)]

def StartNotify(self):
if self.notifying:
self.logger.debug('Already notifying, nothing to do')
return

self.notifying = True
self.toggle_notification()
print('start notify')

def StopNotify(self):
if not self.notifying:
self.logger.debug('Not notifying, nothing to do')
return

self.notifying = False
self.toggle_notification()
print('stop notify')

def notify_any(self):
if not self.notifying:
return

send_dict = {}

send_dict['voice'] = 'hello'
send_dict['vision'] = 'face'
send_dict['touch'] = 'head'
send_dict['distance'] = '199'
send_dict['gyro'] = '0-0'

data = json.dumps(send_dict)
print('data : [{}]'.format(data))
value = []
for b in data:
value.append(dbus.Byte(b.encode()))

self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': value}, [])

def do_notify(self):
"""
According the receive command to change the notify data.
"""

self.notify_any()
return True

def toggle_notification(self):
if not self.notifying:
return

# each 1s notify one time.
GObject.timeout_add(1000, self.do_notify)


class TestCharacteristic(Characteristic):
def __init__(self, bus, index, service):
Characteristic.__init__(self, bus, index, UART_SE_CHARACTERISTIC_UUID,
['read'], service)
self.logger = ai.coloredlogging.get_logger("State",
ai.coloredlogging.RED)
self.logger.addHandler(logging.NullHandler())

def ReadValue(self, options):
v = 'abcdefg'
value = []
for b in v:
value.append(dbus.Byte(b.encode()))
return value


class UartService(Service):
def __init__(self, bus, index):
Service.__init__(self, bus, index, UART_SERVICE_UUID, True)
self.add_characteristic(DemoCharacteristic(bus, 0, self))
self.add_characteristic(TestCharacteristic(bus, 1, self))


class Application(dbus.service.Object):
def __init__(self, bus):
self.path = '/'
self.services = []
dbus.service.Object.__init__(self, bus, self.path)

def get_path(self):
return dbus.ObjectPath(self.path)

def add_service(self, service):
self.services.append(service)

@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self):
response = {}
for service in self.services:
response[service.get_path()] = service.get_properties()
chrcs = service.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
return response


class UartApplication(Application):
def __init__(self, bus):
Application.__init__(self, bus)
self.add_service(UartService(bus, 0))


class UartAdvertisement(Advertisement):
def __init__(self, bus, index):
Advertisement.__init__(self, bus, index, 'peripheral')
self.add_service_uuid(UART_SERVICE_UUID)
self.add_local_name(ai.parameters.CAT_BLE_NAME)
self.include_tx_power = True


def find_adapter(bus):
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
DBUS_OM_IFACE)
objects = remote_om.GetManagedObjects()
for o, props in objects.items():
if LE_ADVERTISING_MANAGER_IFACE in props and GATT_MANAGER_IFACE in props:
return o
print('Skip adapter:', o)
return None


def main():
global mainloop
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter = find_adapter(bus)
if not adapter:
print('BLE adapter not found')
return

service_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, adapter), GATT_MANAGER_IFACE)
ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
LE_ADVERTISING_MANAGER_IFACE)

app = UartApplication(bus)
adv = UartAdvertisement(bus, 0)

mainloop = GLib.MainLoop()

service_manager.RegisterApplication(app.get_path(), {},
reply_handler=register_app_cb,
error_handler=register_app_error_cb)
ad_manager.RegisterAdvertisement(adv.get_path(), {},
reply_handler=register_ad_cb,
error_handler=register_ad_error_cb)
try:
mainloop.run()
except KeyboardInterrupt:
adv.Release()


if __name__ == '__main__':
main()

You can also find other usages from the demo. You can at here find my project demo.

page PV:  ・  site PV:  ・  site UV: