Skip to content

Commit

Permalink
get topics metdata
Browse files Browse the repository at this point in the history
  • Loading branch information
krystianity committed Jun 27, 2018
1 parent 2991d27 commit c989682
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 146 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,14 @@
# sinek CHANGELOG

## 2018-06-27, Version 6.18.0

* brought getTopicMetadata and getMetadata to NConsumer, was only available on NProducer so far
* added getTopicList to NProducer and NConsumer to retrieve a list of available topics
* updated dependencies:

node-rdkafka ~2.3.3 → ~2.3.4
(also sinon, eslint and uuid)

## 2018-05-31, Version 6.17.0

* switched default encoding for messages value and key for JS Kafka client to Buffer
Expand Down
4 changes: 3 additions & 1 deletion lib/librdkafka/Metadata.js
Expand Up @@ -35,7 +35,9 @@ class Metadata {
* returns a list of topic names
*/
asTopicList(){
return this.raw.topics.map(topic => topic.name);
return this.raw.topics.map(topic => topic.name).filter(topic => {
return topic !== "__consumer_offsets";
});
}

/**
Expand Down
47 changes: 47 additions & 0 deletions lib/librdkafka/NConsumer.js
Expand Up @@ -7,6 +7,7 @@ const async = require("async");

const {ConsumerAnalytics} = require("./Analytics.js");
const {ConsumerHealth} = require("./Health.js");
const Metadata = require("./Metadata.js");

//@OPTIONAL
let BlizzKafka = null;
Expand Down Expand Up @@ -1025,6 +1026,52 @@ class NConsumer extends EventEmitter {
checkHealth(){
return this._health.check();
}

/**
* resolve the metadata information for a give topic
* will create topic if it doesnt exist
* @param {string} topic - name of the topic to query metadata for
* @param {number} timeout - optional, default is 2500
* @returns {Promise.<Metadata>}
*/
getTopicMetadata(topic, timeout = 2500) {
return new Promise((resolve, reject) => {

if (!this.consumer) {
return reject(new Error("You must call and await .connect() before trying to get metadata."));
}

this.consumer.getMetadata({
topic,
timeout
}, (error, raw) => {

if (error) {
return reject(error);
}

resolve(new Metadata(raw));
});
});
}

/**
* @alias getTopicMetadata
* @param {number} timeout - optional, default is 2500
* @returns {Promise.<Metadata>}
*/
getMetadata(timeout = 2500) {
return this.getTopicMetadata(null, timeout);
}

/**
* returns a list of available kafka topics on the connected brokers
* @param {number} timeout
*/
async getTopicList(timeout = 2500){
const metadata = await this.getMetadata(timeout);
return metadata.asTopicList();
}
}

module.exports = NConsumer;
11 changes: 11 additions & 0 deletions lib/librdkafka/NProducer.js
Expand Up @@ -8,9 +8,11 @@ const debug = require("debug");
const murmur2Partitioner = require("murmur2-partitioner");

const Metadata = require("./Metadata.js");

const {
ProducerAnalytics
} = require("./Analytics.js");

const {
ProducerHealth
} = require("./Health.js");
Expand Down Expand Up @@ -550,6 +552,15 @@ class NProducer extends EventEmitter {
return this.getTopicMetadata(null, timeout);
}

/**
* returns a list of available kafka topics on the connected brokers
* @param {number} timeout
*/
async getTopicList(timeout = 2500){
const metadata = await this.getMetadata(timeout);
return metadata.asTopicList();
}

/**
* @async
* gets the partition count of the topic from the brokers metadata
Expand Down
10 changes: 5 additions & 5 deletions package.json
@@ -1,6 +1,6 @@
{
"name": "sinek",
"version": "6.17.0",
"version": "6.18.0",
"description": "Node.js kafka client, consumer, producer polite out of the box",
"main": "index.js",
"scripts": {
Expand Down Expand Up @@ -52,17 +52,17 @@
"kafka-node": "~2.6.1",
"murmur2-partitioner": "~1.0.0",
"murmurhash": "~0.0.2",
"uuid": "~3.2.1"
"uuid": "~3.3.0"
},
"devDependencies": {
"eslint": "~4.19.1",
"eslint": "~5.0.1",
"expect.js": "~0.3.1",
"express": "~4.16.3",
"istanbul": "~0.4.5",
"mocha": "~5.2.0",
"sinon": "^6.0.0"
"sinon": "^6.0.1"
},
"optionalDependencies": {
"node-rdkafka": "~2.3.3"
"node-rdkafka": "~2.3.4"
}
}
5 changes: 4 additions & 1 deletion test/int/NSinek.test.js
Expand Up @@ -67,7 +67,10 @@ describe("Native Client INT", () => {

it("should have received first message", done => {
assert.ok(firstMessageReceived);
done();
producer.getTopicList().then((topics) => {
assert.ok(topics.length);
done();
});
});

it("should be able to consume messages", done => {
Expand Down

0 comments on commit c989682

Please sign in to comment.