“大道至简。”
前言
通过上一篇的配置,我们就搭建好了初赛所需要的开发及测试的环境。接下来就是对赛题进行分析、拓扑设计以及码代码了。
正文
目录
题目分析
赛题官方介绍 初赛的任务就是写一个topology来完成对大量实时交易信息的简单计算,整个任务流程就是利用JStorm从RocketMQ拉取数据,按照要求实时计算出结果,并将结果写入Tair。
拓扑设计
在这个过程中尝试了许多种拓扑结构,最终的结构如下图所示:
从最后的结果来看,这种拓扑并不算很好。在初赛结束后与其他选手进行交流发现前几名的拓扑结构都设计的简单巧妙,因此了解到拓扑的流水线不能太长,否则消息在线程间的传递会占用大量时间,从而使得最后的平均处理时间变长。
代码展示
拓扑的结构代码如下:
package com.alibaba.middleware.race.jstorm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.middleware.race.Constants;
import com.alibaba.middleware.race.RaceConfig;
import com.alibaba.middleware.race.util.FileUtil;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
* 这是一个很简单的例子
* 选手的拓扑提交到集群,我们是有超时设置的。每个选手的拓扑最多跑20分钟,一旦超过这个时间
* 我们会将选手拓扑杀掉。
*/
/**
* 选手拓扑入口类,我们定义必须是com.alibaba.middleware.race.jstorm.RaceTopology
* 因为我们后台对选手的git进行下载打包
* ,拓扑运行的入口类默认是com.alibaba.middleware.race.jstorm.RaceTopology; 所以这个主类路径一定要正确
*/
public class RaceTopology {
private static Logger LOG = LoggerFactory.getLogger(RaceTopology.class);
/** Spout **/
private static final int AllSpoutParallelism = 4;
public static final String ALLSPOUT = "AllSpout";
/** Platform Distinguish **/
private static final int PlatformParallelism = 12;
public static final String PLATFORMBOLT = "PlatformBolt";
public static final String TMPAYSTREAM = "TMPayStream";
public static final String TBPAYSTREAM = "TBPayStream";
public static final String ALLPAYSTREAM = "AllPayStream";
/** PayMsgPartSum **/
private static final int PayMsgPartSumParallelism = 8;
public static final String PAY_MSG_PART_SUM_BOLT = "PayMsgPartSum";
/** Writer Bolt **/
private static final int TMCounterWriterParallelism = 4;
public static final String TMCOUNTERWRITERBOLT = "TMCounterWriter";
private static final int TBCounterWriterParallelism = 4;
public static final String TBCOUNTERWRITERBOLT = "TBCounterWriter";
private static final int RatioWriterParallelism = 1;
public static final String RATIOWRITERBOLT = "RatioWriter";
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
/** Spout **/
builder.setSpout(ALLSPOUT, new AllSpout(), AllSpoutParallelism);
/** Bolts receive tuples form spout **/
builder.setBolt(PLATFORMBOLT, new PlatformDistinguish(),
PlatformParallelism).fieldsGrouping(ALLSPOUT,
new Fields("orderID"));
/** tm/tb Writer Bolt **/
builder.setBolt(TMCOUNTERWRITERBOLT, new TMCounterWriter(),
TMCounterWriterParallelism).fieldsGrouping(PLATFORMBOLT,
TMPAYSTREAM, new Fields("time"));
builder.setBolt(TBCOUNTERWRITERBOLT, new TBCounterWriter(),
TBCounterWriterParallelism).fieldsGrouping(PLATFORMBOLT,
TBPAYSTREAM, new Fields("time"));
/** ratio related **/
builder.setBolt(PAY_MSG_PART_SUM_BOLT, new PayMsgPartSum(),
PayMsgPartSumParallelism).fieldsGrouping(PLATFORMBOLT,
ALLPAYSTREAM, new Fields("time"));
builder.setBolt(RATIOWRITERBOLT, new NewRatioWriter(),
RatioWriterParallelism).globalGrouping(PAY_MSG_PART_SUM_BOLT);
String topologyName = RaceConfig.JstormTopologyName;
Config conf = new Config();
conf.setNumWorkers(4);
conf.setNumAckers(0);
// conf.setMessageTimeoutSecs(90);
// conf.setMaxSpoutPending(RaceConfig.SpoutMaxPending);
try {
StormSubmitter.submitTopology(topologyName, conf,
builder.createTopology());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
关键优化
- 根据日志分析拓扑的瓶颈,从而多次改变拓扑结构
- 减轻Spout工作量,并改为支持多线程操作, 这个优化十分重要,将性能提升了2-3倍
- message去重(由于RocketMQ的消息在被多线程消费时是不安全的,因此需要在代码中进行去重),这个优化将最后的准确率提升到了100%
- 其他还有一些优化手段,都是根据日志来进行调整的,并不是太关键
后记
通过初赛学到了实时流数据计算框架的一些基本知识,了解到了淘宝双十一后端的一些技术。
从学习新技术的角度出发,我认为在初赛学到的东西比复赛还要多一些。年轻人嘛,不能总是用自己熟悉的东西,还是要多多去学习和拥抱新技术的,不然就真的是名副其实的码农了。