1
1
// MIT License
2
2
//
3
- // Copyright 2017 Electric Imp
3
+ // Copyright 2017-2019 Electric Imp
4
4
//
5
5
// SPDX-License-Identifier: MIT
6
6
//
@@ -36,11 +36,11 @@ const RECEIVE_DATA_PERIOD_SEC = 2.0;
36
36
class ConsumerTestCase extends ImpTestCase {
37
37
_awsProducer = null ;
38
38
_awsConsumer = null ;
39
+ _awsBlobConsumer = null ;
39
40
_recordsCount = null ;
40
41
_recordsReceived = null ;
41
42
42
43
_counter = null ;
43
- _getShardRecordsCounter = null ;
44
44
_counterStart = null ;
45
45
_shardIds = null ;
46
46
@@ -55,31 +55,31 @@ class ConsumerTestCase extends ImpTestCase {
55
55
AWS_KINESIS_ACCESS_KEY_ID,
56
56
AWS_KINESIS_SECRET_ACCESS_KEY,
57
57
AWS_KINESIS_STREAM_NAME);
58
+ _awsBlobConsumer = AWSKinesisStreams. Consumer (
59
+ AWS_KINESIS_REGION,
60
+ AWS_KINESIS_ACCESS_KEY_ID,
61
+ AWS_KINESIS_SECRET_ACCESS_KEY,
62
+ AWS_KINESIS_STREAM_NAME,
63
+ true );
58
64
59
65
_counter = time ();
60
66
61
67
return _initShardIds ();
62
68
}
63
69
64
70
function testGetBlobRecordsLatest () {
65
- local awsBlobConsumer = AWSKinesisStreams. Consumer (
66
- AWS_KINESIS_REGION,
67
- AWS_KINESIS_ACCESS_KEY_ID,
68
- AWS_KINESIS_SECRET_ACCESS_KEY,
69
- AWS_KINESIS_STREAM_NAME,
70
- true );
71
71
local shardIterators = {};
72
72
return Promise. all (_shardIds. map (function (shardId) {
73
73
return _initShardIterator (
74
- awsBlobConsumer , shardIterators, shardId, AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE. LATEST );
74
+ _awsBlobConsumer , shardIterators, shardId, AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE. LATEST );
75
75
}. bindenv (this ))).
76
76
then (function (value) {
77
77
_recordsCount = 0 ;
78
78
_counterStart = _counter;
79
79
return _putRecordsToAllShards ({});
80
80
}. bindenv (this )).
81
81
then (function (value) {
82
- return _getRecords (awsBlobConsumer , true , shardIterators, 2 );
82
+ return _getRecords (_awsBlobConsumer , true , shardIterators, 2 );
83
83
}. bindenv (this )).
84
84
fail (function (reason) {
85
85
return Promise. reject (reason);
@@ -172,11 +172,11 @@ class ConsumerTestCase extends ImpTestCase {
172
172
return _putRecord (_getRecordData (false , " testPartitionKey" )).
173
173
then (function (value) {
174
174
return _initShardIterator (
175
- _awsConsumer , shardIterators, value. shardId ,
175
+ _awsBlobConsumer , shardIterators, value. shardId ,
176
176
AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE. TRIM_HORIZON );
177
177
}. bindenv (this )).
178
178
then (function (value) {
179
- return _getRecords (_awsConsumer, false , shardIterators, 10 );
179
+ return _getRecords (_awsBlobConsumer, true , shardIterators, 10000 );
180
180
}. bindenv (this )).
181
181
fail (function (reason) {
182
182
return Promise. reject (reason);
@@ -230,20 +230,28 @@ class ConsumerTestCase extends ImpTestCase {
230
230
231
231
function _getRecords (awsConsumer, isBlob, shardIterators, limit) {
232
232
_recordsReceived = 0 ;
233
- _getShardRecordsCounter = 0 ;
234
- return Promise (function (resolve, reject) {
235
- foreach (shardIter in shardIterators) {
236
- _getShardRecords (awsConsumer, isBlob, { " shardIterator" : shardIter, " limit" : limit }, resolve, reject);
237
- }
238
- }. bindenv (this ));
233
+ local shardIds = [];
234
+ foreach (shardId, shardIter in shardIterators) {
235
+ shardIds. push (shardId);
236
+ }
237
+ return Promise. all (
238
+ shardIds. map (function (shardId) {
239
+ return Promise (function (resolve, reject) {
240
+ _getShardRecords (awsConsumer, isBlob, { " shardIterator" : shardIterators[shardId], " limit" : limit }, resolve, reject);
241
+ }. bindenv (this ));
242
+ }. bindenv (this ))).
243
+ then (function (value) {
244
+ if (_recordsReceived < _recordsCount) {
245
+ return Promise. reject (" Records receiving failed" );
246
+ }
247
+ }. bindenv (this )).
248
+ fail (function (reason) {
249
+ return Promise. reject (reason);
250
+ }. bindenv (this ));
239
251
}
240
252
241
253
function _getShardRecords (awsConsumer, isBlob, options, resolve, reject) {
242
254
imp. wakeup (RECEIVE_DATA_PERIOD_SEC, function () {
243
- _getShardRecordsCounter++ ;
244
- if (_getShardRecordsCounter > 2 * _recordsCount) {
245
- return reject (" Records receiving failed" );
246
- }
247
255
awsConsumer. getRecords (
248
256
options,
249
257
function (error, records, millisBehindLatest, nextOptions) {
@@ -272,6 +280,8 @@ class ConsumerTestCase extends ImpTestCase {
272
280
}
273
281
if (millisBehindLatest > 0 && nextOptions) {
274
282
_getShardRecords (awsConsumer, isBlob, nextOptions, resolve, reject);
283
+ } else {
284
+ return resolve (" " );
275
285
}
276
286
}
277
287
}. bindenv (this ));
0 commit comments