Skip to content

Commit 568ce70

Browse files
committed
PubSub: Allow specifying custom encoder
1 parent b785498 commit 568ce70

9 files changed

+109
-30
lines changed

src/socketio/asyncio_aiopika_manager.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
3838
name = 'asyncaiopika'
3939

4040
def __init__(self, url='amqp://guest:guest@localhost:5672//',
41-
channel='socketio', write_only=False, logger=None):
41+
channel='socketio', write_only=False, logger=None,
42+
encoder=pickle):
4243
if aio_pika is None:
4344
raise RuntimeError('aio_pika package is not installed '
4445
'(Run "pip install aio_pika" in your '
@@ -70,7 +71,7 @@ async def _publish(self, data):
7071
channel = await self._channel(connection)
7172
exchange = await self._exchange(channel)
7273
await exchange.publish(
73-
aio_pika.Message(body=pickle.dumps(data),
74+
aio_pika.Message(body=self.encoder.dumps(data),
7475
delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
7576
routing_key='*'
7677
)
@@ -94,7 +95,7 @@ async def _listen(self):
9495
async with self.listener_queue.iterator() as queue_iter:
9596
async for message in queue_iter:
9697
with message.process():
97-
yield pickle.loads(message.body)
98+
yield message.body
9899
except Exception:
99100
self._get_logger().error('Cannot receive from rabbitmq... '
100101
'retrying in '

src/socketio/asyncio_pubsub_manager.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@ class AsyncPubSubManager(AsyncManager):
2424
"""
2525
name = 'asyncpubsub'
2626

27-
def __init__(self, channel='socketio', write_only=False, logger=None):
27+
def __init__(self, channel='socketio', write_only=False, logger=None,
28+
encoder=pickle):
2829
super().__init__()
2930
self.channel = channel
3031
self.write_only = write_only
3132
self.host_id = uuid.uuid4().hex
3233
self.logger = logger
34+
self.encoder = encoder
3335

3436
def initialize(self):
3537
super().initialize()
@@ -153,7 +155,13 @@ async def _thread(self):
153155
if isinstance(message, dict):
154156
data = message
155157
else:
156-
if isinstance(message, bytes): # pragma: no cover
158+
if self.encoder:
159+
try:
160+
data = self.encoder.loads(message)
161+
except:
162+
pass
163+
if data is None and \
164+
isinstance(message, bytes): # pragma: no cover
157165
try:
158166
data = pickle.loads(message)
159167
except:

src/socketio/asyncio_redis_manager.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,14 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
3232
and receiving.
3333
:param redis_options: additional keyword arguments to be passed to
3434
``aioredis.from_url()``.
35+
:param encoder: The encoder to use for publishing and decoding data,
36+
defaults to pickle.
3537
"""
3638
name = 'aioredis'
3739

3840
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
39-
write_only=False, logger=None, redis_options=None):
41+
write_only=False, logger=None, redis_options=None,
42+
encoder=pickle):
4043
if aioredis is None:
4144
raise RuntimeError('Redis package is not installed '
4245
'(Run "pip install aioredis" in your '
@@ -46,7 +49,8 @@ def __init__(self, url='redis://localhost:6379/0', channel='socketio',
4649
self.redis_url = url
4750
self.redis_options = redis_options or {}
4851
self._redis_connect()
49-
super().__init__(channel=channel, write_only=write_only, logger=logger)
52+
super().__init__(channel=channel, write_only=write_only, logger=logger,
53+
encoder=encoder)
5054

5155
def _redis_connect(self):
5256
self.redis = aioredis.Redis.from_url(self.redis_url,
@@ -60,7 +64,7 @@ async def _publish(self, data):
6064
if not retry:
6165
self._redis_connect()
6266
return await self.redis.publish(
63-
self.channel, pickle.dumps(data))
67+
self.channel, self.encoder.dumps(data))
6468
except aioredis.exceptions.RedisError:
6569
if retry:
6670
self._get_logger().error('Cannot publish to redis... '

src/socketio/kafka_manager.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,21 @@ class KafkaManager(PubSubManager): # pragma: no cover
3333
:param write_only: If set to ``True``, only initialize to emit events. The
3434
default of ``False`` initializes the class for emitting
3535
and receiving.
36+
:param encoder: The encoder to use for publishing and decoding data,
37+
defaults to pickle.
3638
"""
3739
name = 'kafka'
3840

3941
def __init__(self, url='kafka://localhost:9092', channel='socketio',
40-
write_only=False):
42+
write_only=False, encoder=pickle):
4143
if kafka is None:
4244
raise RuntimeError('kafka-python package is not installed '
4345
'(Run "pip install kafka-python" in your '
4446
'virtualenv).')
4547

4648
super(KafkaManager, self).__init__(channel=channel,
47-
write_only=write_only)
49+
write_only=write_only,
50+
encoder=encoder)
4851

4952
urls = [url] if isinstance(url, str) else url
5053
self.kafka_urls = [url[8:] if url != 'kafka://' else 'localhost:9092'
@@ -54,7 +57,7 @@ def __init__(self, url='kafka://localhost:9092', channel='socketio',
5457
bootstrap_servers=self.kafka_urls)
5558

5659
def _publish(self, data):
57-
self.producer.send(self.channel, value=pickle.dumps(data))
60+
self.producer.send(self.channel, value=self.encoder.dumps(data))
5861
self.producer.flush()
5962

6063
def _kafka_listen(self):
@@ -64,4 +67,4 @@ def _kafka_listen(self):
6467
def _listen(self):
6568
for message in self._kafka_listen():
6669
if message.topic == self.channel:
67-
yield pickle.loads(message.value)
70+
yield message.value

src/socketio/kombu_manager.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,24 @@ class KombuManager(PubSubManager): # pragma: no cover
4242
``kombu.Queue()``.
4343
:param producer_options: additional keyword arguments to be passed to
4444
``kombu.Producer()``.
45+
:param encoder: The encoder to use for publishing and decoding data,
46+
defaults to pickle.
4547
"""
4648
name = 'kombu'
4749

4850
def __init__(self, url='amqp://guest:guest@localhost:5672//',
4951
channel='socketio', write_only=False, logger=None,
5052
connection_options=None, exchange_options=None,
51-
queue_options=None, producer_options=None):
53+
queue_options=None, producer_options=None,
54+
encoder=pickle):
5255
if kombu is None:
5356
raise RuntimeError('Kombu package is not installed '
5457
'(Run "pip install kombu" in your '
5558
'virtualenv).')
5659
super(KombuManager, self).__init__(channel=channel,
5760
write_only=write_only,
58-
logger=logger)
61+
logger=logger,
62+
encoder=encoder)
5963
self.url = url
6064
self.connection_options = connection_options or {}
6165
self.exchange_options = exchange_options or {}
@@ -103,7 +107,7 @@ def _publish(self, data):
103107
connection = self._connection()
104108
publish = connection.ensure(self.producer, self.producer.publish,
105109
errback=self.__error_callback)
106-
publish(pickle.dumps(data))
110+
publish(self.encoder.dumps(data))
107111

108112
def _listen(self):
109113
reader_queue = self._queue()

src/socketio/pubsub_manager.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ class PubSubManager(BaseManager):
2323
"""
2424
name = 'pubsub'
2525

26-
def __init__(self, channel='socketio', write_only=False, logger=None):
26+
def __init__(self, channel='socketio', write_only=False, logger=None,
27+
encoder=None):
2728
super(PubSubManager, self).__init__()
2829
self.channel = channel
2930
self.write_only = write_only
3031
self.host_id = uuid.uuid4().hex
3132
self.logger = logger
33+
self.encoder = encoder
3234

3335
def initialize(self):
3436
super(PubSubManager, self).initialize()
@@ -151,7 +153,13 @@ def _thread(self):
151153
if isinstance(message, dict):
152154
data = message
153155
else:
154-
if isinstance(message, bytes): # pragma: no cover
156+
if self.encoder:
157+
try:
158+
data = self.encoder.loads(message)
159+
except:
160+
pass
161+
if data is None and \
162+
isinstance(message, bytes): # pragma: no cover
155163
try:
156164
data = pickle.loads(message)
157165
except:

src/socketio/redis_manager.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,26 @@ class RedisManager(PubSubManager): # pragma: no cover
3636
and receiving.
3737
:param redis_options: additional keyword arguments to be passed to
3838
``Redis.from_url()``.
39+
:param encoder: The encoder to use for publishing and decoding data,
40+
defaults to pickle.
3941
"""
4042
name = 'redis'
4143

4244
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
43-
write_only=False, logger=None, redis_options=None):
45+
write_only=False, logger=None, redis_options=None,
46+
encoder=pickle):
4447
if redis is None:
4548
raise RuntimeError('Redis package is not installed '
4649
'(Run "pip install redis" in your '
4750
'virtualenv).')
51+
self.encoder = encoder
4852
self.redis_url = url
4953
self.redis_options = redis_options or {}
5054
self._redis_connect()
5155
super(RedisManager, self).__init__(channel=channel,
5256
write_only=write_only,
53-
logger=logger)
57+
logger=logger,
58+
encoder=encoder)
5459

5560
def initialize(self):
5661
super(RedisManager, self).initialize()
@@ -78,7 +83,8 @@ def _publish(self, data):
7883
try:
7984
if not retry:
8085
self._redis_connect()
81-
return self.redis.publish(self.channel, pickle.dumps(data))
86+
return self.redis.publish(self.channel,
87+
self.encoder.dumps(data))
8288
except redis.exceptions.RedisError:
8389
if retry:
8490
logger.error('Cannot publish to redis... retrying')

src/socketio/zmq_manager.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class ZmqManager(PubSubManager): # pragma: no cover
2929
:param write_only: If set to ``True``, only initialize to emit events. The
3030
default of ``False`` initializes the class for emitting
3131
and receiving.
32+
:param encoder: The encoder to use for publishing and decoding data,
33+
defaults to pickle.
3234
3335
A zmq message broker must be running for the zmq_manager to work.
3436
you can write your own or adapt one from the following simple broker
@@ -50,7 +52,8 @@ class ZmqManager(PubSubManager): # pragma: no cover
5052
def __init__(self, url='zmq+tcp://localhost:5555+5556',
5153
channel='socketio',
5254
write_only=False,
53-
logger=None):
55+
logger=None,
56+
encoder=pickle):
5457
if zmq is None:
5558
raise RuntimeError('zmq package is not installed '
5659
'(Run "pip install pyzmq" in your '
@@ -77,17 +80,18 @@ def __init__(self, url='zmq+tcp://localhost:5555+5556',
7780
self.channel = channel
7881
super(ZmqManager, self).__init__(channel=channel,
7982
write_only=write_only,
80-
logger=logger)
83+
logger=logger,
84+
encoder=encoder)
8185

8286
def _publish(self, data):
83-
pickled_data = pickle.dumps(
87+
encoded_data = self.encoder.dumps(
8488
{
8589
'type': 'message',
8690
'channel': self.channel,
8791
'data': data
8892
}
8993
)
90-
return self.sink.send(pickled_data)
94+
return self.sink.send(encoded_data)
9195

9296
def zmq_listen(self):
9397
while True:
@@ -98,10 +102,7 @@ def zmq_listen(self):
98102
def _listen(self):
99103
for message in self.zmq_listen():
100104
if isinstance(message, bytes):
101-
try:
102-
message = pickle.loads(message)
103-
except Exception:
104-
pass
105+
yield message
105106
if isinstance(message, dict) and \
106107
message['type'] == 'message' and \
107108
message['channel'] == self.channel and \

tests/common/test_pubsub_manager.py

+46-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import functools
22
import logging
3+
import pickle
4+
import json
5+
import marshal
36
import unittest
47
from unittest import mock
58

@@ -365,8 +368,6 @@ def test_background_thread(self):
365368
self.pm._handle_close_room = mock.MagicMock()
366369

367370
def messages():
368-
import pickle
369-
370371
yield {'method': 'emit', 'value': 'foo'}
371372
yield {'missing': 'method'}
372373
yield '{"method": "callback", "value": "bar"}'
@@ -394,3 +395,46 @@ def messages():
394395
self.pm._handle_close_room.assert_called_once_with(
395396
{'method': 'close_room', 'value': 'baz'}
396397
)
398+
399+
def test_background_thread_with_encoder(self):
400+
mock_server = mock.MagicMock()
401+
pm = pubsub_manager.PubSubManager(encoder=marshal)
402+
pm.set_server(mock_server)
403+
pm._publish = mock.MagicMock()
404+
pm._handle_emit = mock.MagicMock()
405+
pm._handle_callback = mock.MagicMock()
406+
pm._handle_disconnect = mock.MagicMock()
407+
pm._handle_close_room = mock.MagicMock()
408+
409+
pm.initialize()
410+
411+
def messages():
412+
yield {'method': 'emit', 'value': 'foo'}
413+
yield marshal.dumps({'method': 'callback', 'value': 'bar'})
414+
yield json.dumps(
415+
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
416+
)
417+
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
418+
yield {'method': 'bogus'}
419+
yield 'bad json'
420+
yield b'bad encoding'
421+
422+
pm._listen = mock.MagicMock(side_effect=messages)
423+
424+
try:
425+
pm._thread()
426+
except StopIteration:
427+
pass
428+
429+
pm._handle_emit.assert_called_once_with(
430+
{'method': 'emit', 'value': 'foo'}
431+
)
432+
pm._handle_callback.assert_called_once_with(
433+
{'method': 'callback', 'value': 'bar'}
434+
)
435+
pm._handle_disconnect.assert_called_once_with(
436+
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
437+
)
438+
pm._handle_close_room.assert_called_once_with(
439+
{'method': 'close_room', 'value': 'baz'}
440+
)

0 commit comments

Comments
 (0)