Skip to content

Commit

Permalink
js client messages as buffer, dep upgrade, simple tests
Browse files Browse the repository at this point in the history
  • Loading branch information
krystianity committed Jun 15, 2018
1 parent c19f1b3 commit 2991d27
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 1,039 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,18 @@
# sinek CHANGELOG

## 2018-05-31, Version 6.17.0

* switched default encoding for messages value and key for JS Kafka client to Buffer
* simplified integration tests
* updated dependencies:

kafka-node ~2.4.1 → ~2.6.1
eslint ~4.18.2 → ~4.19.1
mocha ~5.0.4 → ~5.2.0
sinon ^4.4.6 → ^6.0.0
node-rdkafka ~2.2.3 → ~2.3.3
async ~2.6.0 → ~2.6.1

## 2018-05-31, Version 6.16.0

* updated NConsumer and NProducer to debug and concat errors of require of native lib
Expand Down
16 changes: 8 additions & 8 deletions package.json
@@ -1,6 +1,6 @@
{
"name": "sinek",
"version": "6.16.0",
"version": "6.17.0",
"description": "Node.js kafka client, consumer, producer polite out of the box",
"main": "index.js",
"scripts": {
Expand All @@ -10,7 +10,7 @@
"kafka:stop": "./kafka-setup/stop.sh",
"kafka:logs": "docker-compose --file ./kafka-setup/docker-compose.yml logs -f",
"kafka:console": "./kafka-setup/kafka-console.sh",
"test": "npm run lint && _mocha -- -R spec --exit --timeout 10000 test/int/*",
"test": "_mocha --recursive --timeout 32500 --exit -R spec test/int",
"rd-lt": "NODE_ENV=production node ./perf/RDLoadTest.js"
},
"repository": {
Expand Down Expand Up @@ -46,23 +46,23 @@
},
"homepage": "https://github.com/nodefluent/node-sinek#readme",
"dependencies": {
"async": "~2.6.0",
"async": "~2.6.1",
"bluebird": "~3.5.1",
"debug": "~3.1.0",
"kafka-node": "~2.4.1",
"kafka-node": "~2.6.1",
"murmur2-partitioner": "~1.0.0",
"murmurhash": "~0.0.2",
"uuid": "~3.2.1"
},
"devDependencies": {
"eslint": "~4.18.2",
"eslint": "~4.19.1",
"expect.js": "~0.3.1",
"express": "~4.16.3",
"istanbul": "~0.4.5",
"mocha": "~5.0.4",
"sinon": "^4.4.6"
"mocha": "~5.2.0",
"sinon": "^6.0.0"
},
"optionalDependencies": {
"node-rdkafka": "~2.2.3"
"node-rdkafka": "~2.3.3"
}
}
22 changes: 21 additions & 1 deletion test/nconfig.js → test/config.js
Expand Up @@ -41,6 +41,26 @@ const consumerConfig = Object.assign({}, config, {
}
});

const jsProducerConfig = {
kafkaHost: "localhost:9092",
clientName: "n-test-producer-js",
options: {}
};

const jsConsumerConfig = {
kafkaHost: "localhost:9092",
groupId: "n-test-group-js",
options: {
autoCommit: true
}
};

const topic = "n-test-topic";

module.exports = {producerConfig, consumerConfig, topic};
module.exports = {
producerConfig,
consumerConfig,
jsProducerConfig,
jsConsumerConfig,
topic
};
2 changes: 1 addition & 1 deletion test/int/Health.test.js
Expand Up @@ -10,7 +10,7 @@ const {
ProducerHealth
} = Health;

describe("Health", () => {
describe("Health UNIT", () => {

const getFakeProducerAnalyticsResult = (produced = 0, errors = 0) => {
return {
Expand Down
23 changes: 13 additions & 10 deletions test/int/NSinekF.test.js → test/int/JSSinek.test.js
@@ -1,10 +1,10 @@
"use strict";

const assert = require("assert");
const {NConsumer, NProducer} = require("./../../index.js");
const {producerConfig, consumerConfig, topic} = require("./../nconfig.js");
const {Consumer, Producer} = require("./../../index.js");
const {jsProducerConfig, jsConsumerConfig, topic} = require("../config");

describe("NSinek INT String (fast)", () => {
describe("Javascript Client INT", () => {

let consumer = null;
let producer = null;
Expand All @@ -14,19 +14,22 @@ describe("NSinek INT String (fast)", () => {

before(done => {

producer = new NProducer(producerConfig);
consumer = new NConsumer([topic], consumerConfig);
producer = new Producer(jsProducerConfig);
consumer = new Consumer([topic], jsConsumerConfig);

producer.on("error", error => console.error(error));
consumer.on("error", error => console.error(error));

Promise.all([
producer.connect(),
consumer.connect()
consumer.connect(false)
]).then(() => {
consumer.on("message", message => consumedMessages.push(message));
consumer.consume().then(() => {
firstMessageReceived = true;
consumer.consume();
consumer.on("message", message => {
consumedMessages.push(message);
if(!firstMessageReceived){
firstMessageReceived = true;
}
});
setTimeout(done, 1000);
});
Expand Down Expand Up @@ -68,7 +71,7 @@ describe("NSinek INT String (fast)", () => {
});

it("should be able to consume messages", done => {
//console.log(consumedMessages);
console.log(consumedMessages);
assert.ok(consumedMessages.length);
assert.ok(!Buffer.isBuffer(consumedMessages[0].value));
assert.equal(consumedMessages[0].value, "a message");
Expand Down
144 changes: 18 additions & 126 deletions test/int/NSinek.test.js
Expand Up @@ -2,66 +2,36 @@

const assert = require("assert");
const {NConsumer, NProducer} = require("./../../index.js");
const {producerConfig, consumerConfig, topic} = require("./../nconfig.js");
const {producerConfig, consumerConfig, topic} = require("../config");

describe("NSinek INT Buffer (1by1)", () => {
describe("Native Client INT", () => {

let consumer = null;
let producer = null;
const consumedMessages = [];
let firstMessageReceived = false;
let messagesChecker;

let producerAnalyticsResult = null;
let consumerAnalyticsResult = null;
let commitCount = -1;
let commits = 0;
let comittedMessages = 0;

const oneByNModeOptions = {
batchSize: 1,
commitEveryNBatch: 1,
concurrency: 1,
commitSync: true
};

before(done => {

producer = new NProducer(producerConfig, null, "auto");
producer = new NProducer(producerConfig);
consumer = new NConsumer([topic], consumerConfig);

const analyticsOptions = {
analyticsInterval: 500,
lagFetchInterval: 1000
};

producer.enableAnalytics(analyticsOptions);
producer.on("analytics", res => producerAnalyticsResult = res);

consumer.on("analytics", res => consumerAnalyticsResult = res);
consumer.enableAnalytics(analyticsOptions);

consumer.on("commit", messageCount => {
//console.log("com", messageCount);
commitCount = messageCount;
commits++;
comittedMessages += messageCount;
});

producer.on("error", error => console.error(error));
consumer.on("error", error => console.error(error));

Promise.all([
producer.connect(),
consumer.connect()
]).then(() => {
consumer.consume((message, callback) => {
consumer.consume();
consumer.on("message", message => {
consumedMessages.push(message);
callback();
}, false, false, oneByNModeOptions).then(() => {
firstMessageReceived = true;
if(!firstMessageReceived){
firstMessageReceived = true;
}
});
setTimeout(done, 1900);
setTimeout(done, 1000);
});
});

Expand All @@ -73,28 +43,22 @@ describe("NSinek INT Buffer (1by1)", () => {
}
});

it("should make sure topic exists (get metdata)", () => {
return producer.getTopicMetadata(topic);
});

it("should be able to produce messages", () => {

const promises = [
producer.send(topic, "a message"),
producer.bufferFormatPublish(topic, "1", {content: "a message 1"}, 1, null, 0),
producer.bufferFormatUpdate(topic, "2", {content: "a message 2"}, 1, null, 0),
producer.bufferFormatUnpublish(topic, "3", {content: "a message 3"}, 1, null, 0),
producer.send(topic, new Buffer("a message buffer")),
producer.send(topic, new Buffer("a message buffer"))
];

return Promise.all(promises);
});

it("should be able to wait", function(done){
this.timeout(10000);
it("should be able to wait", done => {
messagesChecker = setInterval(()=>{
if(consumedMessages.length >= 6){
if(consumedMessages.length >= 5){
clearInterval(messagesChecker);
done();
}
Expand All @@ -107,86 +71,14 @@ describe("NSinek INT Buffer (1by1)", () => {
});

it("should be able to consume messages", done => {

assert.equal(consumedMessages.length, 6);
assert.equal(consumedMessages.length, producer.getStats().totalPublished);
assert.equal(consumedMessages.length, consumer.getStats().totalIncoming);

console.log(consumedMessages);
assert.ok(consumedMessages.length);
assert.ok(Buffer.isBuffer(consumedMessages[0].value));
assert.equal(consumedMessages[0].value.toString("utf8"), "a message");
assert.equal(JSON.parse(consumedMessages[1].value.toString("utf8")).payload.content, "a message 1");
assert.equal(JSON.parse(consumedMessages[2].value.toString("utf8")).payload.content, "a message 2");
assert.equal(JSON.parse(consumedMessages[3].value.toString("utf8")).payload.content, "a message 3");
assert.equal(consumedMessages[4].value.toString("utf8"), "a message buffer");
assert.ok(!Buffer.isBuffer(consumedMessages[0].value));
assert.equal(consumedMessages[0].value, "a message");
assert.equal(JSON.parse(consumedMessages[1].value).payload.content, "a message 1");
assert.equal(JSON.parse(consumedMessages[2].value).payload.content, "a message 2");
assert.equal(JSON.parse(consumedMessages[3].value).payload.content, "a message 3");
assert.equal(consumedMessages[4].value, "a message buffer");
done();
});

it("should be able to get partition count for topic", done => {
producer.getPartitionCountOfTopic(topic).then(count => {
//console.log(count);
//console.log(producer.getStoredPartitionCounts());
assert.ok(count);
assert.ok(producer.getStoredPartitionCounts()[topic].count);
done();
});
});

it("should be able to get partition count for topic (from cache)", done => {
producer.getPartitionCountOfTopic(topic).then(count => {
//console.log(count);
//console.log(producer.getStoredPartitionCounts());
assert.ok(count);
assert.ok(producer.getStoredPartitionCounts()[topic].count);
done();
});
});

it("should return -1 on error", done => {
producer.getPartitionCountOfTopic("dont-exist-x").then(count => {
//console.log(count);
assert.equal(count, -1);
done();
});
});

it("should be able to get offsets for topic", () => {
return consumer.getOffsetForTopicPartition(topic, 0);
});

it("should be able to get comitted offsets", () => {
consumer.getAssignedPartitions();
return consumer.getComittedOffsets();
});

it("should be able to get lag infos for consumer", () => {
return consumer.getLagStatus().then(awass => {
console.log(awass);
return true;
});
});

it("should be able to see producer analytics data", () => {
assert.ok(producerAnalyticsResult);
console.log(producer.getAnalytics());
});

it("should be able to see consumer analytics data", () => {
assert.ok(consumerAnalyticsResult);
console.log(JSON.stringify(consumer.getAnalytics(), null, 4));
});

it("should be able to see correct amount of commits", () => {
console.log(commitCount, comittedMessages, commits);
assert.equal(oneByNModeOptions.batchSize, commitCount);
assert.equal(comittedMessages, 6);
assert.equal(commits, 6);
});

it("should be able to make health checks", () => {
return Promise.all([
consumer.checkHealth().then(console.log),
producer.checkHealth().then(console.log)
]);
});
});

0 comments on commit 2991d27

Please sign in to comment.