Skip to content

Support for Frando's kappa next #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 108 additions & 78 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Kappa View Query

`kappa-view-query` is a materialised view to be used with kappa-core. It provides an API that allows you to define your own indexes and execute custom [map-filter-reduce](https://github.com/dominictarr/map-filter-reduce) queries over a collection of hypercores.
`kappa-view-query` is a materialised view to be used with kappa-core. It provides an API that allows you to define your own indexes and execute custom [map-filter-reduce](https://github.com/dominictarr/map-filter-reduce) queries over your indexes.

`kappa-view-query` is inspired by [flumeview-query](https://github.com/flumedb/flumeview-query). It uses the same scoring system for determining the most efficient index relevant to the provided query.

Expand Down Expand Up @@ -50,93 +50,119 @@ In the case of the above dataset and query, the closest matching index is the on

## Usage

### Hypercore

This example uses a single hypercore and collects all messages at a given point in time.

```js
const kappa = require('kappa-core')
const Query = require('./')
const Kappa = require('kappa-core')
const hypercore = require('hypercore')
const ram = require('random-access-memory')
const collect = require('collect-stream')
const memdb = require('memdb')
const sub = require('subleveldown')

const Query = require('./')
const { validator, fromHypercore } = require('./util')
const { cleaup, tmp } = require('./test/util')

const seedData = require('./test/seeds.json')

// Initialised your kappa-core back-end
const core = kappa(ram, { valueEncoding: 'json' })
const core = new Kappa()
const feed = hypercore(ram, { valueEncoding: 'json' })
const db = memdb()

// Define a validator or a message decoder to determine if a message should be indexed or not
function validator (msg) {
if (typeof msg !== 'object') return null
if (typeof msg.value !== 'object') return null
if (typeof msg.value.timestamp !== 'number') return null
if (typeof msg.value.type !== 'string') return null
return msg
}
core.use('query', createHypercoreSource({ feed, db: sub(db, 'state') }), Query(sub(db, 'view'), {
indexes: [
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }
],
// you can pass a custom validator function to ensure all messages entering a feed match a specific format
validator,
// implement your own getMessage function, and perform any desired validation on each message returned by the query
getMessage: fromHypercore(feed)
}))

feed.append(seedData, (err, _) => {
core.ready('query', () => {
const query = [{ $filter: { value: { type: 'chat/message' } }]

// grab then log all chat/message message types up until this point
collect(core.view.query.read({ query }), (err, chats) => {
if (err) return console.error(err)
console.log(chats)

// grab then log all user/about message types up until this point
collect(core.view.query.read({ query: [{ $filter: { value: { type: 'user/about' } } }] }), (err, users) => {
if (err) return console.error(err)
console.log(users)
})
})
})
})
```

// here's an alternative using protocol buffers, assuming a message schema exists
const { Message } = protobuf(fs.readFileSync(path.join(path.resolve(__dirname), 'message.proto')))
### Multifeed

function validator (msg) {
try { msg.value = Message.decode(msg.value) }
catch (err) { return console.error(err) && false }
return msg
}
This example uses a multifeed instance for managing hypercores and sets up two live streams to dump messages to the console as they arrive.

// Define a set of indexes under a namespace
const indexes = [
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] },
{ key: 'cha', value: [['value', 'type'], ['value', 'content', 'channel']] }
]

core.use('query', Query(db, { indexes, validator }))

core.writer('local', (err, feed) => {
// Populate our feed with some messages
const data = [{
type: 'chat/message',
timestamp: Date.now(),
content: { body: 'Hi im new here...' }
}, {
type: 'user/about',
timestamp: Date.now(),
content: { name: 'Grace' }
}, {
type: 'chat/message',
timestamp: Date.now(),
content: { body: 'Second post' }
}, {
type: 'chat/message',
timestamp: Date.now(),
content: { channel: 'dogs', body: 'Lurchers rule' }
}, {
type: 'chat/message',
timestamp: Date.now(),
content: { channel: 'dogs', body: 'But sometimes I prefer labra-doodles' }
}, {
type: 'user/about',
timestamp: Date.now(),
content: { name: 'Poison Ivy' }
}]

feed.append(data, (err, seq) => {
// Define a query: filter where the message value contains type 'chat/message', and the content references the channel 'dogs'
const query = [{ $filter: { value: { type: 'chat/message', content: { channel: 'dogs' } } } }]

core.ready('query', () => {
// For static queries
collect(core.api.query.read({ query }), (err, msgs) => {
console.log(msgs)

// Logs all messages of type chat/message that reference the dogs channel, and order by timestamp...
// {
// type: 'chat/message',
// timestamp: 1561996331743,
// content: { channel: 'dogs', body: 'Lurchers rule' }
// }
// {
// type: 'chat/message',
// timestamp: Date.now(),
// content: { channel: 'dogs', body: 'But sometimes I prefer labra-doodles' }
// }
})
```js
const Kappa = require('kappa-core')
const multifeed = require('multifeed')
const ram = require('random-access-memory')
const collect = require('collect-stream')
const memdb = require('memdb')
const sub = require('subleveldown')

const Query = require('./')
const { validator, fromMultifeed } = require('./util')
const { cleaup, tmp } = require('./test/util')

const seedData = require('./test/seeds.json')

const core = new Kappa()
const feeds = multifeed(ram, { valueEncoding: 'json' })
const db = memdb()

core.use('query', createMultifeedSource({ feeds, db: sub(db, 'state') }), Query(sub(db, 'view'), {
indexes: [
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }
],
validator,
// make sure you define your own getMessage function, otherwise nothing will be returned by your queries
getMessage: fromMultifeed(feeds)
}))

core.ready('query', () => {
// setup a live query to first log all chat/message
core.view.query.ready({
query: [{ $filter: { value: { type: 'chat/message' } } }],
live: true,
old: false
}).on('data', (msg) => {
if (msg.sync) return next()
console.log(msg)
})

function next () {
// then to first log all user/about
core.view.query.read({
query: [{ $filter: { value: { type: 'user/about' } } }],
live: true,
old: false
}).on('data', (msg) => {
console.log(msg)
})
}
})

// then append a bunch of data to two different feeds in a multifeed
feeds.writer('one', (err, one) => {
feeds.writer('two', (err, two) => {

one.append(seedData.slice(0, 3))
two.append(seedData.slice(3, 5))
})
})
```
Expand All @@ -148,6 +174,7 @@ const View = require('kappa-view-query')
```

Expects a LevelUP or LevelDOWN instance `leveldb`.
Expects a `getMessage` function to use your defined index to grab the message from the feed.

```js
// returns a readable stream
Expand Down Expand Up @@ -180,3 +207,6 @@ kappa-view-query was built by [@kyphae](https://github.com/kyphae/) and assisted
### 2.0.7
- Fixed an outstanding issue where live streams were not working. Queries with `{ live: true }` setup will now properly pipe messages through as they are indexed.
- Fixed an outstanding issue where messages with a matching timestamp were colliding, where the last indexed would over-writing the previous. Messages are now indexed, on top of provided values, on the sequence number and the feed id, for guaranteed uniqueness.

### 3.0.0 (not yet released)
- Updated to use the experimental version of kappa-core which includes breaking API changes. See Frando's [kappa-core fork](https://github.com/Frando/kappa-core/tree/kappa5-new).
63 changes: 0 additions & 63 deletions example.js

This file was deleted.

Loading