课程咨询 :0571-56020834 QQ:3163902815

杭州软件测试培训

杭州软件测试培训 > 达内新闻 > 达内分析Rabbitmq和Kafka的性能测试
  • 达内分析Rabbitmq和Kafka的性能测试

    发布:杭州软件测试培训      来源:51测试网      时间:2016-03-10


  •     杭州达内软件测试培训专家给大家讲一下Rabbitmq和Kafka的性能测试。

        测试环境:ubuntu 15.10 64位 

        cpu:inter core i7-4790 3.60GHZ * 8 

        内存:16GB

        硬盘:ssd 120GB

        软件环境:rabbmitmq 3.6.0   kafka0.8.1  (均为单机本机运行)

        PS: 测试结果均为单操作测试,即生产的时候没有消费操作

        测试结果:

        kafka :消费速度: 37,586 /s  生产速度: 448,753 /s

        rabbitmq: 消费速度: 20,807 /s  生产速度  16.413 /s

        出现问题:

        rabbitmq 生产4分钟左右出现队列阻塞,无法继续添加数据,1分钟后恢复,再过大约1分钟又出现此现象并以约1分钟为间隔出现此问题。

        rabbitmq 生产对象时有不小的几率(约 1/20)添加队列失败,报出的错误是“tcp链接重置”

        其他并无任何问题

        结论:

        很明显的看出kafka的性能远超rabbitmq。不过这也是理所当然的,毕竟2个消息队列实现的协议是不一样的,处理消息的场景也大有不同。rabbitmq适合处理一些数据严谨的消息,比如说支付消息,社交消息等不能丢失的数 。kafka是批量操作切不报证数据是否能完整的到达消费者端,所以适合一些大量的营销消息的场景。

        代码:


    kafka:
    package main
    import (
    "github.com/Shopify/sarama"
    "os"
    "os/signal"
    "sync"
    "log"
    "time"
    )
    func main() {
    go producer()
    //    go consumer()
    time.Sleep(10*time.Minute)
    }
    func producer()  {
    config :=sarama.NewConfig()
    config.Producer.Return.Successes = true
    proder,err := sarama.NewAsyncProducer([]string{"localhost:9092"},config)
    if err != nil {
    panic(err)
    }
    signals :=make(chan  os.Signal,1)
    signal.Notify(signals,os.Interrupt)
    var (
    wg                          sync.WaitGroup
    enqueued, successes, errors int
    )
    wg.Add(1)
    go func() {
    defer  wg.Done()
    for _=range proder.Successes(){
    successes++
    }
    }()
    wg.Add(1)
    go func() {
    defer wg.Done()
    for err := range proder.Errors(){
    log.Println(err)
    errors++
    }
    }()
    go func() {
    t1 := time.NewTicker(time.Second)
    for{
    <- t1.C
    log.Println(enqueued)
    }
    }()
    ProducerLoop:
    for{
    message :=&sarama.ProducerMessage{Topic:"test",Value:sarama.StringEncoder("testing 123")}
    select {
    case proder.Input() <- message:
    enqueued++
    case <- signals:
    proder.AsyncClose()
    break ProducerLoop
    }
    }
    wg.Wait()
    log.Println("Successfully produced:%d;errors:%d\n",successes,errors)
    }
    func consumer()  {
    coner,err := sarama.NewConsumer([]string{"localhost:9092"},nil)
    if err != nil {
    panic(err)
    }
    defer func() {
    if err :=coner.Close(); err !=nil{
    log.Fatalln(err)
    }
    }()
    partitionConsumer ,err := coner.ConsumePartition("test",0,sarama.OffsetNewest)
    if err != nil {
    panic(err)
    }
    defer func() {
    if err := partitionConsumer.Close();err!=nil{
    log.Fatalln(err)
    }
    }()
    signals := make(chan os.Signal,1)
    signal.Notify(signals,os.Interrupt)
    consumed:=0
    go func() {
    t1 := time.NewTicker(time.Second)
    for{
    <- t1.C
    log.Println(consumed)
    }
    }()
    ConsumerLoop:
    for{
    select {
    case _ = <-partitionConsumer.Messages():
    consumed++
    //            log.Println( string(msg.Value),"  =>  ",consumed)
    case <-signals:
    break ConsumerLoop
    }
    }
    log.Printf("Consumed: %d\n", consumed)
    }


    rabbitmq:


    package main
    import (
    "github.com/streadway/amqp"
    "time"
    "fmt"
    "log"
    )
    const (
    queueName = "push.msg.q"
    exchange  = "t.msg.ex"
    mqurl ="amqp://shimeng:shimeng1015@192.168.155.106:5672/push"
    )
    var conn *amqp.Connection
    var channel *amqp.Channel
    func main() {
    fmt.Println(1)
    //    push()
    receive()
    //    fmt.Println("end")
    //    close()
    }
    func failOnErr(err error, msg string) {
    if err != nil {
    log.Fatalf("%s:%s", msg, err)
    panic(fmt.Sprintf("%s:%s", msg, err))
    }
    }
    func mqConnect() {
    var err error
    conn, err = amqp.Dial(mqurl)
    if err != nil {
    log.Println(1)
    log.Fatalln(err)
    }
    fmt.Println(5)
    channel, err = conn.Channel()
    if err != nil {
    fmt.Println(2)
    log.Fatalln(err)
    }else {
    fmt.Println("a")
    }
    }
    func push() {
    count := 0
    if channel == nil {
    fmt.Println(2)
    mqConnect()
    }else {
    fmt.Println(3)
    }
    msgContent := "hello world!"
    t1 := time.NewTicker(time.Second)
    go func() {
    for{
    <- t1.C
    log.Println(count)
    }
    }()
    for{
    err := channel.Publish(exchange, "test", false, false, amqp.Publishing{ 
    ContentType: "text/plain",
    Body:        []byte(msgContent),
    })
    if err != nil {
    }else {
    count ++
    }
    }
    }
    func receive() {
    if channel == nil {
    mqConnect()
    }
    count :=0
    msgs, err := channel.Consume(queueName, "", true, false, false, false, nil)
    failOnErr(err, "")
    forever := make(chan bool)
    t1 := time.NewTicker(time.Second)
    go func() {
    for{
    <- t1.C
    log.Println(count)
    }
    }()
    go func() {
    //fmt.Println(*msgs)
    for _= range msgs {
    count ++
    //            s := BytesToString(&(d.Body))
    //            count++
    //            fmt.Printf("receve msg is :%s -- %d\n", *s, count)
    }
    }()
    fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
    <-forever
    }




    原文链接:http://www.51testing.com/html/61/n-3707461.html
    推荐文章

上一篇:Vertica导出数据测试用例

下一篇:达内:导入测试用例的设计

最新开班日期  |  更多

国际软件测试工程师精品班

国际软件测试工程师精品班

开班日期:每月底

国际软件测试工程师提升班

国际软件测试工程师提升班

开班日期:每月底

国际软件测试工程师就业班

国际软件测试工程师就业班

开班日期:每月底

国际软件测试工程师就业班

国际软件测试工程师就业班

开班日期:每月底

  • 地址:杭州市西湖区文三路199号创业大厦4楼
  • 课程培训电话:0571-56020834 QQ:3163902815     全国服务监督电话:400-827-0010
  • 服务邮箱 ts@tedu.cn
  • 2001-2016 达内国际公司(TARENA INTERNATIONAL,INC.) 版权所有 京ICP证08000853号-56