Skip to content

Commit

Permalink
Support async load of persistence in the abstract test. Bumped v1.1.0.
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina committed Jan 6, 2016
1 parent 15dc6ac commit 5a68d09
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 43 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,25 @@ abs({
})
```

If you require some async stuff before returning, a callback is also
supported:

```js
var test = require('tape').test
var myperst = require('./')
var abs = require('aedes-persistence/abstract')
var clean = require('./clean') // invented module

abs({
test: test,
persistence: function build (cb) {
clean(function (err) {
cb(err, myperst())
})
}
})
```

## License

MIT
84 changes: 42 additions & 42 deletions abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ function abstractPersistence (opts) {
var test = opts.test
var persistence = opts.persistence

if (persistence.length === 0) {
persistence = function asyncify (cb) {
cb(null, opts.persistence())
}
}

function storeRetained (instance, opts, cb) {
opts = opts || {}

Expand All @@ -26,16 +32,27 @@ function abstractPersistence (opts) {
}

function matchRetainedWithPattern (t, pattern, opts) {
var instance = persistence()
persistence(function (err, instance) {
if (err) { throw err }

storeRetained(instance, opts, function (err, packet) {
t.notOk(err, 'no error')
var stream = instance.createRetainedStream(pattern)
storeRetained(instance, opts, function (err, packet) {
t.notOk(err, 'no error')
var stream = instance.createRetainedStream(pattern)

stream.pipe(concat(function (list) {
t.deepEqual(list, [packet], 'must return the packet')
instance.destroy(t.end.bind(t))
}))
stream.pipe(concat(function (list) {
t.deepEqual(list, [packet], 'must return the packet')
instance.destroy(t.end.bind(t))
}))
})
})
}

function testInstance (title, cb) {
test(title, function (t) {
persistence(function (err, instance) {
if (err) { throw err }
cb(t, instance)
})
})
}

Expand All @@ -51,8 +68,7 @@ function abstractPersistence (opts) {
matchRetainedWithPattern(t, 'hello/+')
})

test('remove retained message', function (t) {
var instance = persistence()
testInstance('remove retained message', function (t, instance) {
storeRetained(instance, {}, function (err, packet) {
t.notOk(err, 'no error')
storeRetained(instance, {
Expand All @@ -70,8 +86,7 @@ function abstractPersistence (opts) {
})
})

test('storing twice a retained message should keep only the last', function (t) {
var instance = persistence()
testInstance('storing twice a retained message should keep only the last', function (t, instance) {
storeRetained(instance, {}, function (err, packet) {
t.notOk(err, 'no error')
storeRetained(instance, {
Expand All @@ -89,8 +104,7 @@ function abstractPersistence (opts) {
})
})

test('store and look up subscriptions by client', function (t) {
var instance = persistence()
testInstance('store and look up subscriptions by client', function (t, instance) {
var client = { id: 'abcde' }
var subs = [{
topic: 'hello',
Expand All @@ -115,8 +129,7 @@ function abstractPersistence (opts) {
})
})

test('remove subscriptions by client', function (t) {
var instance = persistence()
testInstance('remove subscriptions by client', function (t, instance) {
var client = { id: 'abcde' }
var subs = [{
topic: 'hello',
Expand Down Expand Up @@ -144,8 +157,7 @@ function abstractPersistence (opts) {
})
})

test('store and look up subscriptions by topic', function (t) {
var instance = persistence()
testInstance('store and look up subscriptions by topic', function (t, instance) {
var client = { id: 'abcde' }
var subs = [{
topic: 'hello',
Expand Down Expand Up @@ -176,8 +188,7 @@ function abstractPersistence (opts) {
})
})

test('QoS 0 subscriptions, restored but not matched', function (t) {
var instance = persistence()
testInstance('QoS 0 subscriptions, restored but not matched', function (t, instance) {
var client = { id: 'abcde' }
var subs = [{
topic: 'hello',
Expand Down Expand Up @@ -208,8 +219,7 @@ function abstractPersistence (opts) {
})
})

test('clean subscriptions', function (t) {
var instance = persistence()
testInstance('clean subscriptions', function (t, instance) {
var client = { id: 'abcde' }
var subs = [{
topic: 'hello',
Expand Down Expand Up @@ -237,8 +247,7 @@ function abstractPersistence (opts) {
})
})

test('store and count subscriptions', function (t) {
var instance = persistence()
testInstance('store and count subscriptions', function (t, instance) {
var client = { id: 'abcde' }
var subs = [{
topic: 'hello',
Expand Down Expand Up @@ -275,8 +284,7 @@ function abstractPersistence (opts) {
})
})

test('add outgoing packet and stream it', function (t) {
var instance = persistence()
testInstance('add outgoing packet and stream it', function (t, instance) {
var sub = {
clientId: 'abcde',
topic: 'hello',
Expand Down Expand Up @@ -318,8 +326,7 @@ function abstractPersistence (opts) {
})
})

test('add outgoing packet and stream it twice', function (t) {
var instance = persistence()
testInstance('add outgoing packet and stream it twice', function (t, instance) {
var sub = {
clientId: 'abcde',
topic: 'hello',
Expand Down Expand Up @@ -384,8 +391,7 @@ function abstractPersistence (opts) {
})
}

test('add outgoing packet and update messageId', function (t) {
var instance = persistence()
testInstance('add outgoing packet and update messageId', function (t, instance) {
var sub = {
clientId: 'abcde', topic: 'hello', qos: 1
}
Expand Down Expand Up @@ -414,8 +420,7 @@ function abstractPersistence (opts) {
})
})

test('add 2 outgoing packet and clear messageId', function (t) {
var instance = persistence()
testInstance('add 2 outgoing packet and clear messageId', function (t, instance) {
var sub = {
clientId: 'abcde', topic: 'hello', qos: 1
}
Expand Down Expand Up @@ -461,8 +466,7 @@ function abstractPersistence (opts) {
})
})

test('update to pubrel', function (t) {
var instance = persistence()
testInstance('update to pubrel', function (t, instance) {
var sub = {
clientId: 'abcde', topic: 'hello', qos: 1
}
Expand Down Expand Up @@ -510,8 +514,7 @@ function abstractPersistence (opts) {
})
})

test('add incoming packet, get it, and clear with messageId', function (t) {
var instance = persistence()
testInstance('add incoming packet, get it, and clear with messageId', function (t, instance) {
var client = {
id: 'abcde'
}
Expand Down Expand Up @@ -557,8 +560,7 @@ function abstractPersistence (opts) {
})
})

test('store, fetch and delete will message', function (t) {
var instance = persistence()
testInstance('store, fetch and delete will message', function (t, instance) {
var broker = {
id: 'mybrokerId'
}
Expand Down Expand Up @@ -596,8 +598,7 @@ function abstractPersistence (opts) {
})
})

test('stream all will messages', function (t) {
var instance = persistence()
testInstance('stream all will messages', function (t, instance) {
var broker = {
id: 'mybrokerId'
}
Expand Down Expand Up @@ -630,8 +631,7 @@ function abstractPersistence (opts) {
})
})

test('stream all will message for unknown brokers', function (t) {
var instance = persistence()
testInstance('stream all will message for unknown brokers', function (t, instance) {
var broker = {
id: 'mybrokerId'
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "aedes-persistence",
"version": "1.0.2",
"version": "1.1.0",
"description": "The spec for an Aedes persistence, with abstract tests and a fast in-memory implementation.",
"main": "persistence.js",
"scripts": {
Expand Down

0 comments on commit 5a68d09

Please sign in to comment.