Skip to content

Commit 0c20d0d

Browse files
author
betzrhodes
authored
Merge pull request #8 from electricimp/develop
Develop
2 parents 2a90456 + 54073fc commit 0c20d0d

File tree

2 files changed

+33
-23
lines changed

2 files changed

+33
-23
lines changed

Diff for: .impt.test

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"deviceGroupId": "ca83d15f-fcb8-d3c2-ff69-b0d4acb20e9b",
33
"deviceGroupName" : "impFarm M",
4-
"timeout": 180,
4+
"timeout": 600,
55
"stopOnFail": false,
66
"allowDisconnect": false,
77
"builderCache": true,

Diff for: tests/Consumer.agent.test.nut

+32-22
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// MIT License
22
//
3-
// Copyright 2017 Electric Imp
3+
// Copyright 2017-2019 Electric Imp
44
//
55
// SPDX-License-Identifier: MIT
66
//
@@ -36,11 +36,11 @@ const RECEIVE_DATA_PERIOD_SEC = 2.0;
3636
class ConsumerTestCase extends ImpTestCase {
3737
_awsProducer = null;
3838
_awsConsumer = null;
39+
_awsBlobConsumer = null;
3940
_recordsCount = null;
4041
_recordsReceived = null;
4142

4243
_counter = null;
43-
_getShardRecordsCounter = null;
4444
_counterStart = null;
4545
_shardIds = null;
4646

@@ -55,31 +55,31 @@ class ConsumerTestCase extends ImpTestCase {
5555
AWS_KINESIS_ACCESS_KEY_ID,
5656
AWS_KINESIS_SECRET_ACCESS_KEY,
5757
AWS_KINESIS_STREAM_NAME);
58+
_awsBlobConsumer = AWSKinesisStreams.Consumer(
59+
AWS_KINESIS_REGION,
60+
AWS_KINESIS_ACCESS_KEY_ID,
61+
AWS_KINESIS_SECRET_ACCESS_KEY,
62+
AWS_KINESIS_STREAM_NAME,
63+
true);
5864

5965
_counter = time();
6066

6167
return _initShardIds();
6268
}
6369

6470
function testGetBlobRecordsLatest() {
65-
local awsBlobConsumer = AWSKinesisStreams.Consumer(
66-
AWS_KINESIS_REGION,
67-
AWS_KINESIS_ACCESS_KEY_ID,
68-
AWS_KINESIS_SECRET_ACCESS_KEY,
69-
AWS_KINESIS_STREAM_NAME,
70-
true);
7171
local shardIterators = {};
7272
return Promise.all(_shardIds.map(function (shardId) {
7373
return _initShardIterator(
74-
awsBlobConsumer, shardIterators, shardId, AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.LATEST);
74+
_awsBlobConsumer, shardIterators, shardId, AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.LATEST);
7575
}.bindenv(this))).
7676
then(function (value) {
7777
_recordsCount = 0;
7878
_counterStart = _counter;
7979
return _putRecordsToAllShards({});
8080
}.bindenv(this)).
8181
then(function (value) {
82-
return _getRecords(awsBlobConsumer, true, shardIterators, 2);
82+
return _getRecords(_awsBlobConsumer, true, shardIterators, 2);
8383
}.bindenv(this)).
8484
fail(function(reason) {
8585
return Promise.reject(reason);
@@ -172,11 +172,11 @@ class ConsumerTestCase extends ImpTestCase {
172172
return _putRecord(_getRecordData(false, "testPartitionKey")).
173173
then(function(value) {
174174
return _initShardIterator(
175-
_awsConsumer, shardIterators, value.shardId,
175+
_awsBlobConsumer, shardIterators, value.shardId,
176176
AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.TRIM_HORIZON);
177177
}.bindenv(this)).
178178
then(function (value) {
179-
return _getRecords(_awsConsumer, false, shardIterators, 10);
179+
return _getRecords(_awsBlobConsumer, true, shardIterators, 10000);
180180
}.bindenv(this)).
181181
fail(function(reason) {
182182
return Promise.reject(reason);
@@ -230,20 +230,28 @@ class ConsumerTestCase extends ImpTestCase {
230230

231231
function _getRecords(awsConsumer, isBlob, shardIterators, limit) {
232232
_recordsReceived = 0;
233-
_getShardRecordsCounter = 0;
234-
return Promise(function (resolve, reject) {
235-
foreach (shardIter in shardIterators) {
236-
_getShardRecords(awsConsumer, isBlob, { "shardIterator" : shardIter, "limit" : limit }, resolve, reject);
237-
}
238-
}.bindenv(this));
233+
local shardIds = [];
234+
foreach (shardId, shardIter in shardIterators) {
235+
shardIds.push(shardId);
236+
}
237+
return Promise.all(
238+
shardIds.map(function (shardId) {
239+
return Promise(function (resolve, reject) {
240+
_getShardRecords(awsConsumer, isBlob, { "shardIterator" : shardIterators[shardId], "limit" : limit }, resolve, reject);
241+
}.bindenv(this));
242+
}.bindenv(this))).
243+
then(function (value) {
244+
if (_recordsReceived < _recordsCount) {
245+
return Promise.reject("Records receiving failed");
246+
}
247+
}.bindenv(this)).
248+
fail(function(reason) {
249+
return Promise.reject(reason);
250+
}.bindenv(this));
239251
}
240252

241253
function _getShardRecords(awsConsumer, isBlob, options, resolve, reject) {
242254
imp.wakeup(RECEIVE_DATA_PERIOD_SEC, function () {
243-
_getShardRecordsCounter++;
244-
if (_getShardRecordsCounter > 2 * _recordsCount) {
245-
return reject("Records receiving failed");
246-
}
247255
awsConsumer.getRecords(
248256
options,
249257
function (error, records, millisBehindLatest, nextOptions) {
@@ -272,6 +280,8 @@ class ConsumerTestCase extends ImpTestCase {
272280
}
273281
if (millisBehindLatest > 0 && nextOptions) {
274282
_getShardRecords(awsConsumer, isBlob, nextOptions, resolve, reject);
283+
} else {
284+
return resolve("");
275285
}
276286
}
277287
}.bindenv(this));

0 commit comments

Comments
 (0)