高手的存在,就是让服务10亿人的时候,你感觉只是为你一个人服务......

kafka与countdownlatch

目录

之前测试kafka消息发送的QPS,刚开始用loadrunner写java的,模拟kafka produce的send,发现loadrunner对这种高精确的请求有很大误差(kafka 的消息精确到毫秒,最小的测试40ms),可能loadrunner模拟请求自身需要耗费时间,遂改为直接通过编写代码多线程方式发送请求。

哎,很惭愧,对java不是很精通~
开始直接无脑new Thread,for循环依次启动10个线程执行,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class KafkaProduce {
static final ProducerConfig config;
static {
// 设置配置属性
Properties props = new Properties();
props.put("metadata.broker.list", "192.168.28.191:9092,192.168.28.192:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
// key.serializer.class默认为serializer.class
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
// 值为0,1,-1,可以参考
// http://kafka.apache.org/08/configuration.html
props.put("request.required.acks", "0");
config = new ProducerConfig(props);
}
static String topic = "kafka_test_1";
private static byte[] data = new byte[1024];

public static void main(String[] args) throws Exception {
try {
for (int i = 0; i <= 10; i++) {

new Thread() {
// 创建producer
Producer<String, String> producer = new Producer<String, String>(config);

@Override
public void run() {
long all = 0;
long begin = System.nanoTime();
for (int i = 0; i <= 1000; i++) {
producer.send(new KeyedMessage<String, String>(topic, new String(data)));
}
long end = System.nanoTime();
all += end - begin;
System.out.println(TimeUnit.NANOSECONDS.toMillis(all));
}
}.start();

}
}
catch (Exception e) {
System.out.println("kafka failed");
e.printStackTrace();
}

}}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
1106
1637
1676
1682
1701
1751
1760
1777
1859
1913
1938

今天无聊逛了逛贴吧,发现竟然有countdownlatch这么个东东,可以让10个线程一块跑,并发执行的话,这个应该更合理吧,遂改了一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

public class KafaProduce2 {

static final ProducerConfig config;
static {
// 设置配置属性
Properties props = new Properties();
props.put("metadata.broker.list", "192.168.28.191:9092,192.168.28.192:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
// key.serializer.class默认为serializer.class
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
// 值为0,1,-1,可以参考
// http://kafka.apache.org/08/configuration.html
props.put("request.required.acks", "0");
config = new ProducerConfig(props);
}
static String topic = "kafka_test_1";
private static byte[] data = new byte[1024];

public static void main(String[] args) throws InterruptedException {

// 倒数计数器
final CountDownLatch begin = new CountDownLatch(1);

// 倒数计数器
final CountDownLatch end = new CountDownLatch(10);
for (int i = 0; i <= 9; i++) {
new Thread() {

// 创建producer
Producer<String, String> producer = new Producer<String, String>(config);

@Override
public void run() {
try {
begin.await();
long all = 0;
long begin = System.nanoTime();
for (int i = 0; i <= 1000; i++) {
producer.send(new KeyedMessage<String, String>(topic, new String(data)));
}
long end = System.nanoTime();
all = end - begin;
System.out.println(TimeUnit.NANOSECONDS.toMillis(all));
}
catch (InterruptedException e) {

e.printStackTrace();
}
finally {
end.countDown();
}

};
}.start();
}

begin.countDown();

System.out.println("开始。。。。");

end.await();

System.out.println("结束。。。。");
}}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
开始。。。。
1406
1618
1643
1708
1714
1768
1783
1837
1840
1858
结束。。。。

然后,加了个线程池,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class KafkaProduce3 {
static final ProducerConfig config;
static {
// 设置配置属性
Properties props = new Properties();
props.put("metadata.broker.list", "192.168.28.191:9092,192.168.28.192:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
// key.serializer.class默认为serializer.class
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
// 值为0,1,-1,可以参考
// http://kafka.apache.org/08/configuration.html
props.put("request.required.acks", "0");
config = new ProducerConfig(props);
}
static String topic = "kafka_test_1";
private static byte[] data = new byte[1024];

public static void main(String[] args) throws InterruptedException {

// 倒数计数器
final CountDownLatch begin = new CountDownLatch(1);

// 倒数计数器
final CountDownLatch end = new CountDownLatch(10);

ExecutorService execu = Executors.newFixedThreadPool(10);
for (int i = 0; i <= 9; i++) {
Runnable r = new Runnable() {

// 创建producer
Producer<String, String> producer = new Producer<String, String>(config);

public void run() {
try {
begin.await();
long all = 0;
KeyedMessage k = new KeyedMessage<String, String>(topic, new String(data));

for (int i = 0; i <= 1000; i++) {
long begin = System.nanoTime();
producer.send(k);
long end = System.nanoTime();
all += end - begin;
}

System.out.println(TimeUnit.NANOSECONDS.toMillis(all));
}
catch (InterruptedException e) {

e.printStackTrace();
}
finally {
end.countDown();
}

};
};

execu.submit(r);
}

begin.countDown();

System.out.println("开始。。。。");

end.await();

System.out.println("结束。。。。");
execu.shutdown();
}}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
开始。。。。
1296
1463
1464
1578
1603
1630
1755
1771
1826
1854
结束。。。。

后面2个结果好一些,做并发测试的时候也更合理!