NoSQL就此Storm轻松实时大数目解析【翻译】

原文地址

简单易行好用,Storm让雅数目解析变得好。

现,公司当一般运行着时会生出TB(terabytes)级的数量。数据来源于包括由网络传感器捕获的,到Web,社交媒体,交易型业务数据,以及任何事情环境遭到创设的数额。考虑到数量的生成量,实时总计(real-time
computation
)已改成许多社团面临的一个宏大挑战。我们早已有效地运用了一个可扩张的实时总结体系——开源的
Storm 工具,它是发 Twitter
开发,通常被号称“实时 Hadoop(real-time Hadoop)”。可是,Storm 远远比
Hadoop 简单,因为其并不需要明白新技巧处理好数据。

正文介绍了哪使 Storm。示例项目名为“超速警报系统(Speeding Alert
System),”分析实时数据,当车速过一个预定义的阈值(threshold)时,触发一个
trigger,相关数据就是会晤保留至数据库中。

什么是Storm


Hadoop 依靠批量拍卖(batch processing),而 Storm
是一个实时的(real-time),分布式的(distributed),容错的(fault-tolerant),总计连串。像
Hadoop,它可以保可靠性处理大量之数量,但未克实时;也就是说,每个信息都用于处理。Storm
也提供这么些特征,如容错,分布式总括,这多少个使它们可当不同机器上拍卖大规模数据。它还保有如下特点:

  • 简单扩张。若想扩张,你就待加上设备与更改 topology
    的并行性设置。用于集群协调的 Hadoop Zookeeper 用当 Storm
    使得它非凡容易扩张。
  • 保每个音信都于处理。
  • Storm 集群(cluster)很轻管理。
  • 容错性:一旦 topology 被交给,Storm 运行
    topology,直到其为丰硕掉或者集群为关闭。其余,假诺举行中发生误,那么重新分配的天职由
    Storm 处理。
  • Storm 的 topology 能够用此外语言定义,但常见依然故 Java。

章接下的一对,你首先需装和创建 Storm。步骤如下:

  • Storm 官方站点下载 Storm.
  • 解压,将 bin/ 添加到公的环境变量 PATH,保证 bin/storm 脚论可举行。

Hadoop Map/Reduce
的多少处理过程是,从HDFS中获取数据,分片后,举办分布式处理,最后输出结果。

Hadoop 和 Storm 的定义比较,如下表所示:

Hadoop

Storm

JobTracker

Nimbus

TaskTracker

Supervisor

Child

Worker

Job

Topology

Mapper/Reducer

Spout/Bolt

Twitter 列举了Storm的老三挺类使用:

  1. 音讯流处理(Stream
    processing)。Storm可用来实时处理新数据及革新数据库,兼具容错性和而扩充性。即Storm能够用来处理源源不断流进去的信息,处理将来以结果写副到某存储着去。

  2. 连续总括(Continuous
    computation)。Storm可举办连接查询并将结果虽平日报告给客户端。比如将Twitter上的热门话题发送至浏览器中。

  3. 分布式远程程序调用(Distributed
    RPC)。Storm可用来并行处理密集查询。Storm的拓扑结构是一个等候调用新闻的布函数,当她接受一模一样长达调用信息后,会对查询举行总括,并回查询结果。举个例子Distributed
    RPC可以开并行搜索依然处理很是集合的多少。

透过部署drpc服务器,将storm的topology公布为drpc服务。客户端程序可以调用drpc服务将数据发送到storm集群中,并收取处理结果的反映。这种方法需要drpc服务器举办转发,其中drpc服务器底层通过thrift实现。适合的作业场景首如果实时总括。并且扩充性卓越,可以增添每个节点的办事worker数量来动态扩张。

  1. 品类推行,构建Topology。

Storm组件


Storm 集群紧假设因为主节点(master)和行事节点(worker node)组成,它们通过
Zookeeper 举行协调。Nimbus类似Hadoop里面的 JobTracker。Nimbus
负责在集群内分发代码,分配总计任务为机器, 并且监控状态。

主节点(master)——Nimbus

主节点运行一个护理进程(daemon),Nimbus,它负责在会聚众多被遍布代码,分配任务(Task)并监测故障。它仿佛于
Hadoop 的 Job Tracker。

劳引力节点(worker node)——Supervisor

行事节点同样会运行一个医护进程,Supervisor,它监听已分配的做事,并依据要求运行工作经过。每个工作节点都实施一个
topology 的子集。Nimbus 和 Supervisor 之间的和谐是由于 Zookeeper
或该集群来管理。

Zookeeper

Zookeeper 负责 Supervisor 和 Nimbus
之间的调和。一个实时应用程序的逻辑给包裹到一个 Storm
的“topology”中。一个 topology 是由于同样组 spouts(数据源)和
bolts(数据操作)组成,通过 Stream Groupings
连接(协调)。下边又进一步求证这多少个术语。

Spout

简单来讲来说,一个 spout 在 topology 中由一个源中读取数据。spout
可以是牢靠的,也堪是不可靠的。如若 Storm 处理失利,那么一个可靠的
spout 可以保重新发送元组(它是一个多少项之稳步列表)。一个不可靠的
spout,元组一旦发送,它不碰面跟。spout 中之严重性格局是
nextTuple()。该种方法仍然朝 topology
发出一个新元组,或是直接回到,假若没什么可发。

Bolt

bolt 负责所有拍卖处理 topology 发生的方方面面。 bolt
可举办打过滤到连,聚合,写文件/数据库等等任何事。bolt 于一个 spout
接收数据来拍卖,在复杂流转换中,它可进一步暴发元组到此外一个 bolt。bolt
中重要措施是 execute(),它接受一个元组作为输入。在 spout 和
bolt,发动元组到还多之流,可以在 declareStream() 中声明与指定流。

Stream Grouping

stream grouping 定义流在 bolt 任务中如何吃分开。Storm
提供了嵌入的流分组:随机分组(shuffle
grouping),域组域(fields grouping),所有分组(all
grouping),单一分组(one grouping),直接分组(direct
grouping)和本地/随机分组(local/shuffle
grouping)。自定义分组实现可应用 CustomStreamGrouping 接口。

  • 轻易分组(Shuffle
    grouping):随机分发tuple到Bolt的职责,保证每个任务得到相当于数量的tuple。
  • 字段分组(菲尔德(Field)s
    grouping):依照指定字段分割数据流,并分组。例如,按照“user-id”字段,相同“user-id”的元组总是分发到与一个职责,不同“user-id”的元组可能分发及不同之职责。
  • 全方位分组(All
    grouping):tuple被复制到bolt的具有任务。这类别型需要小心翼翼使用。
  • 全局分组(Global
    grouping):全体流都分配至bolt的及一个职责。明确地说,是分配为ID最小之这一个task。
  • 凭分组(None
    grouping):你不需要关心流是安分组。近日,无分组等效于随机分组。但说到底,Storm将拿管分组的Bolts放到Bolts或Spouts订阅它们的同一线程去实施(假设可能)。
  • 直白分组(Direct
    grouping):这是一个专门之分组类型。元组生产者决定tuple由哪个元组处理者任务接收。

此外,还提到其他概念。

Task

worker 中每一个 spout/bolt 的线程称为一个 task。在 storm0.8 之后,task
不再与物理线程对应,同一个 spout/bolt 的 task
可能会晤共享一个大体线程,该线程称为 executor。

Tuple

一律涂鸦信息传递的骨干单元。本来当是一个 key-value 的
map,然而由各类零部件间传递的tuple的字段名称都先期定义好,所以,tuple
中使坚守次填入各样value 就尽了,所以就是是一个 value list。

Topology

storm中运作的一个实时应用程序,因为各类零部件间的信息流动形成逻辑上的一个拓扑结构。一个topology是spouts和bolts组成的觊觎,
通过stream groupings将图备受的spouts和bolts连接起来,如下图:

NoSQL 1

一个 topology 会从来运转,直到你 kill
掉她,Storm自动地重新分配执行破产的任务,
并且Storm可以确保你无碰面时有发生数量丢失(假使翻开了高可靠性的语)。虽然有机意外停机它点的具备任务会叫更换到其他机器上。

运转一个topology很简单。首先,把你有着的代码和所倚重之jar打进一个jar包。然后运行类似上面的这一个命令:

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2 

斯令会运作主类: backtype.strom.MyTopology, 参数是arg1,
arg2。这么些近乎的main函数定义之topology并且将她交给给Nimbus。storm
jar负责连接到Nimbus并且达传jar包。

Topology的概念是一个Thrift结构,并且Nimbus就是一个Thrift服务,
你可交给由外语言创造的topology。下面的点是因而JVM-based语言提交的太简便的主意。

Stream

源源不断传递的tuple就做了stream。消息流stream是storm里的首要抽象。一个音讯流是一个尚未边界的tuple连串,
而那一个tuple系列会以同等种分布式的格局相地创立及处理。通过对stream中tuple连串中每个字段命名来定义stream。在默认的情下,tuple的字段类型可以是:integer,long,short,
byte,string,double,float,boolean和byte
array。你啊堪起定义类型(只要实现相应的系列化器)。

每个信息流在概念之下会被分配受一个id,因为光为音信流使用的非凡普遍,
OutputFieldsDeclarer
定义了有些方法吃您得定义一个stream而毫无指定这些id。在这种气象下此stream会分配个价吗‘default’默认的id

Storm提供的极中央的拍卖stream的原语是spout和bolt。你得兑现spout和bolt提供的接口来拍卖你的事务逻辑。

NoSQL 2 

实现


于我们的演示中,我们筹了一个 spout 和 bolt 的
topology,可以拍卖大量圈数量(日志文件),设计触发一个报警,当一个特定值领先预设阈值时。使用
Storm 的 topology,日志文件按行读取,topology 监控到之数量。在 Storm
组件,spout
读取到来的数。它不只打现存的文书被读取数据,也监控新文件。一旦文件于涂改,spout
读取新条令,转换为元组(一个好吃 bolt 读取的格式)后,把元组发出到
bolt 执行阈值分析,查找任何抢先阈值的记录。

阈值分析(Threshold Analysis)


本节最主要集中零星种植档次的阈值(threshold)分析:瞬时阈值(instant
thershold)和时空连串阈值(time series threshold)。

  • 一晃儿阈值监测:一个字段的值在非凡刹那间超越了预设的逼值,尽管条件适合的言辞则触发一个trigger。举个例子当车子过80英里各时,则触发trigger。
  • 日连串阈值监测:字段的价当一个加的岁月段内超越了预设的薄值,倘若条件适合则触发一个触发器。比如:在5分钟内,时速超过80英里各时两糟以及以上之车。

清单 1
显示一个我们用的日记文件,它蕴含车辆数量音讯,例如车辆编号,速度,地方。

清单 1:日志文件,通过检查点的车辆音信

AB 123, 60, North city

BC 123, 70, South city

CD 234, 40, South city

DE 123, 40, East city

EF 123, 90, South city

GH 123, 50, West city

创设相应的XML文件,它由到的多寡格式组成。用于解析日志文件。架构 XML
及其相应的叙述如下表所示。

NoSQL 3

XML文件及日志文件都被 spout 随时监测,本例使用的 topology 如下图所显示。

NoSQL 4

贪图 1:Storm中开创的 topology,以处理实时数据

如图1所示,FilelistenerSpout 接收输入日志,并逐行读取,把数量发送给
ThresoldCalculatorBolt
进一步的阈值处理。一旦处理完,按照阈值总结的行被发动到
DBWriterBolt,持久化到数据库(或发出报警)。这么些历程的求实实现以以下边介绍。

Spout 实现


spout
把日记文件以及XML描述符文件作为输入。该XML文件指定了日记文件之格式。现在考虑一个例子的日志文件,它富含车辆音信,如车子编号,速度,地点等三单音信。如图
2 所示。

NoSQL 5

贪图 2:数据由日记文件及 spout 的流程图

列表 2
突显了tuple对应的XML,其中指定了字段、将日志文件切割成字段的定界符以以及字段的品类。XML文件与数都为保存至Spout指定的门路。

列表 2:用以描述日志文件的XML文件

<TUPLEINFO>

              <FIELDLIST>

                  <FIELD>

                            <COLUMNNAME>vehicle_number</COLUMNNAME> 

                            <COLUMNTYPE>string</COLUMNTYPE> 

                  </FIELD>



                  <FIELD>

                            <COLUMNNAME>speed</COLUMNNAME> 

                            <COLUMNTYPE>int</COLUMNTYPE> 

                  </FIELD>



                  <FIELD>

                             <COLUMNNAME>location</COLUMNNAME> 

                             <COLUMNTYPE>string</COLUMNTYPE> 

                  </FIELD>

              </FIELDLIST>  

           <DELIMITER>,</DELIMITER> 

</TUPLEINFO>

构造函数用参数 Directory、PathSpout 和 TupleInfo 对象创造 Spout
对象。TupleInfo
储存与日志文件有关的画龙点睛音讯,如字段、分隔符、字段类型等。该对象通过XSTream序列化XML来建建。

Spout实现步骤:

  • 监听一个独日志文件的变通。监控目录是否上加新的日记文件。
  • 声称字段后,把 spout 读取行转换成 tuple。
  • 扬言Spout和Bolt之间的分组,并控制tuple发送给Bolt的主意。

Spout 代码如下列表 3 所著。

列表 3:Spout中 open、nextTuple 和 delcareOutputFields 方法

public void open( Map conf, TopologyContext context,SpoutOutputCollector collector ) 

{

    _collector = collector;

    try

    {

    fileReader  =  new BufferedReader(new FileReader(new File(file)));

    } 

    catch (FileNotFoundException e) 

    {

    System.exit(1);

    }

}



public void nextTuple() 

{

    protected void ListenFile(File file) 

    {

    Utils.sleep(2000);

    RandomAccessFile access = null; 

    String line = null;                  

       try

       { 

           while ((line = access.readLine()) != null)

           { 

               if (line !=null)

               {

                    String[] fields=null;

                    if (tupleInfo.getDelimiter().equals("|"))

                       fields = line.split("\\"+tupleInfo.getDelimiter());

                    else                                                                                                             fields = line.split(tupleInfo.getDelimiter());                                                

                    if (tupleInfo.getFieldList().size() == fields.length)

                       _collector.emit(new Values(fields)); 

               }          

           } 

      } 

      catch (IOException ex) { }              

      }

}



public void declareOutputFields(OutputFieldsDeclarer declarer) 

{

     String[] fieldsArr = new String [tupleInfo.getFieldList().size()];

     for(int i=0; i<tupleInfo.getFieldList().size(); i++)

     {

         fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();

     }           

     declarer.declare(new Fields(fieldsArr));

}   

declareOutputFileds()
决定tuple发送的格式,这样,Bolt就会为此接近的计编码
tuple。Spout持续监听添加到日志文件之数码,一旦闹数量增长,它就是读取并将多少发送给
bolt 处理。

Bolt 实现


Spout
输出结果用给予Bolt举办双重丰裕一步之拍卖。经过对用例的思维,我们的topology中待而图
3遭到的个别单Bolt。

NoSQL 6

贪图 3:Spout到Bolt的数据流程

ThresholdCalculatorBolt

Spout将tuple发出,由ThresholdCalculatorBolt接收并展开临界值处理。在此地,它以吸纳好几宗输入举办检讨;分别是:

逼价值检查

  • 阈值栏数检查(拆分成字段的数量)
  • 阈值数据类型(拆分后字段的类)
  • 阈值出现的频数
  • 阈值时段检查

列表 4中的接近,定义用来保存这多少个价值。

public class ThresholdInfo implements Serializable

{

    private String action;

    private String rule;

    private Object thresholdValue;

    private int thresholdColNumber;

    private Integer timeWindow;

    private int frequencyOfOccurence;

}

据悉字段中提供的价值,阈值检查将给以 execute() 方法执行,如列表 5
所出示。代码大部分的功效是分析和检测到之价。

列表 5:阈值检测代码段

public void execute(Tuple tuple, BasicOutputCollector collector) 

{

    if(tuple!=null)

    {

        List<Object> inputTupleList = (List<Object>) tuple.getValues();

        int thresholdColNum = thresholdInfo.getThresholdColNumber();

        Object thresholdValue = thresholdInfo.getThresholdValue();

        String thresholdDataType = 

            tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();

        Integer timeWindow = thresholdInfo.getTimeWindow();

        int frequency = thresholdInfo.getFrequencyOfOccurence();



        if(thresholdDataType.equalsIgnoreCase("string"))

        {

            String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();

            String frequencyChkOp = thresholdInfo.getAction();

            if(timeWindow!=null)

            {

                long curTime = System.currentTimeMillis();

                long diffInMinutes = (curTime-startTime)/(1000);

                if(diffInMinutes>=timeWindow)

                {

                    if(frequencyChkOp.equals("=="))

                    {

                         if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))

                         {

                             count.incrementAndGet();

                             if(count.get() > frequency)

                                 splitAndEmit(inputTupleList,collector);

                         }

                    }

                    else if(frequencyChkOp.equals("!="))

                    {

                        if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))

                        {

                             count.incrementAndGet();

                             if(count.get() > frequency)

                                 splitAndEmit(inputTupleList,collector);

                         }

                     }

                     else

                         System.out.println("Operator not supported");

                 }

             }

             else

             {

                 if(frequencyChkOp.equals("=="))

                 {

                     if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))

                     {

                         count.incrementAndGet();

                         if(count.get() > frequency)

                             splitAndEmit(inputTupleList,collector);    

                     }

                 }

                 else if(frequencyChkOp.equals("!="))

                 {

                      if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))

                      {

                          count.incrementAndGet();

                          if(count.get() > frequency)

                              splitAndEmit(inputTupleList,collector);   

                      }

                  }

              }

           }

           else if(thresholdDataType.equalsIgnoreCase("int") || 

                   thresholdDataType.equalsIgnoreCase("double") || 

                   thresholdDataType.equalsIgnoreCase("float") || 

                   thresholdDataType.equalsIgnoreCase("long") || 

                   thresholdDataType.equalsIgnoreCase("short"))

           {

               String frequencyChkOp = thresholdInfo.getAction();

               if(timeWindow!=null)

               {

                    long valueToCheck = 

                        Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());

                    long curTime = System.currentTimeMillis();

                    long diffInMinutes = (curTime-startTime)/(1000);

                    System.out.println("Difference in minutes="+diffInMinutes);

                    if(diffInMinutes>=timeWindow)

                    {

                         if(frequencyChkOp.equals("<"))

                         {

                             if(valueToCheck < Double.parseDouble(thresholdValue.toString()))

                             {

                                  count.incrementAndGet();

                                  if(count.get() > frequency)

                                      splitAndEmit(inputTupleList,collector);

                             }

                         }

                         else if(frequencyChkOp.equals(">"))

                         {

                              if(valueToCheck > Double.parseDouble(thresholdValue.toString())) 

                              {

                                  count.incrementAndGet();

                                  if(count.get() > frequency)

                                      splitAndEmit(inputTupleList,collector);

                              }

                          }

                          else if(frequencyChkOp.equals("=="))

                          {

                             if(valueToCheck == Double.parseDouble(thresholdValue.toString()))

                             {

                                 count.incrementAndGet();

                                 if(count.get() > frequency)

                                     splitAndEmit(inputTupleList,collector);

                              }

                          }

                          else if(frequencyChkOp.equals("!="))

                          {

   . . . 

                          }

                      }



             }

     else

          splitAndEmit(null,collector);

     }

     else

     {

          System.err.println("Emitting null in bolt");

          splitAndEmit(null,collector);

     }

}

冲阈值 bolt 发送的 tuple 被发送至下一个应和的Bolt,在大家的用例中凡
DBWriterBolt

DBWriterBolt

既处理的tuple必须于持久化,以便为触发tigger或者往后应用。DBWiterBolt
完成的工作是以 tuple 持久化到数据库。表底立是由 prepare()
完成,这吗是topology调用之率先单主意。该模式的代码如列表 6 所呈现。

列表 6:创制表的代码

public void prepare( Map StormConf, TopologyContext context ) 

{       

    try

    {

        Class.forName(dbClass);

    } 

    catch (ClassNotFoundException e) 

    {

        System.out.println("Driver not found");

        e.printStackTrace();

    }



    try

    {

       connection driverManager.getConnection( 

           "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);

       connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();



       StringBuilder createQuery = new StringBuilder(

           "CREATE TABLE IF NOT EXISTS "+tableName+"(");

       for(Field fields : tupleInfo.getFieldList())

       {

           if(fields.getColumnType().equalsIgnoreCase("String"))

               createQuery.append(fields.getColumnName()+" VARCHAR(500),");

           else

               createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");

       }

       createQuery.append("thresholdTimeStamp timestamp)");

       connection.prepareStatement(createQuery.toString()).execute();



       // Insert Query

       StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");

       String tempCreateQuery = new String();

       for(Field fields : tupleInfo.getFieldList())

       {

            insertQuery.append(fields.getColumnName()+",");

       }

       insertQuery.append("thresholdTimeStamp").append(") values (");

       for(Field fields : tupleInfo.getFieldList())

       {

           insertQuery.append("?,");

       }



       insertQuery.append("?)");

       prepStatement = connection.prepareStatement(insertQuery.toString());

    }

    catch (SQLException e) 

    {       

        e.printStackTrace();

    }       

}

多少的插是劈批次完成的。插入的逻辑由 execute() 方法供,如列表 7
所显示。大部分代码是分析各样不同输入型。

列表 7:数据插入的代码有

public void execute(Tuple tuple, BasicOutputCollector collector) 

{

    batchExecuted=false;

    if(tuple!=null)

    {

       List<Object> inputTupleList = (List<Object>) tuple.getValues();

       int dbIndex=0;

       for(int i=0;i<tupleInfo.getFieldList().size();i++)

       {

           Field field = tupleInfo.getFieldList().get(i);

           try {

               dbIndex = i+1;

               if(field.getColumnType().equalsIgnoreCase("String"))             

                   prepStatement.setString(dbIndex, inputTupleList.get(i).toString());

               else if(field.getColumnType().equalsIgnoreCase("int"))

                   prepStatement.setInt(dbIndex,

                       Integer.parseInt(inputTupleList.get(i).toString()));

               else if(field.getColumnType().equalsIgnoreCase("long"))

                   prepStatement.setLong(dbIndex, 

                       Long.parseLong(inputTupleList.get(i).toString()));

               else if(field.getColumnType().equalsIgnoreCase("float"))

                   prepStatement.setFloat(dbIndex, 

                       Float.parseFloat(inputTupleList.get(i).toString()));

               else if(field.getColumnType().equalsIgnoreCase("double"))

                   prepStatement.setDouble(dbIndex, 

                       Double.parseDouble(inputTupleList.get(i).toString()));

               else if(field.getColumnType().equalsIgnoreCase("short"))

                   prepStatement.setShort(dbIndex, 

                       Short.parseShort(inputTupleList.get(i).toString()));

               else if(field.getColumnType().equalsIgnoreCase("boolean"))

                   prepStatement.setBoolean(dbIndex, 

                       Boolean.parseBoolean(inputTupleList.get(i).toString()));

               else if(field.getColumnType().equalsIgnoreCase("byte"))

                   prepStatement.setByte(dbIndex, 

                       Byte.parseByte(inputTupleList.get(i).toString()));

               else if(field.getColumnType().equalsIgnoreCase("Date"))

               {

                  Date dateToAdd=null;

                  if (!(inputTupleList.get(i) instanceof Date))  

                  {  

                       DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

                       try

                       {

                           dateToAdd = df.parse(inputTupleList.get(i).toString());

                       }

                       catch (ParseException e) 

                       {

                           System.err.println("Data type not valid");

                       }

                   }  

                   else

                   {

            dateToAdd = (Date)inputTupleList.get(i);

            java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());

            prepStatement.setDate(dbIndex, sqlDate);

            }   

            } 

        catch (SQLException e) 

        {

             e.printStackTrace();

        }

    }

    Date now = new Date();          

    try

    {

        prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));

        prepStatement.addBatch();

        counter.incrementAndGet();

        if (counter.get()== batchSize) 

        executeBatch();

    } 

    catch (SQLException e1) 

    {

        e1.printStackTrace();

    }           

   }

   else

   {

        long curTime = System.currentTimeMillis();

       long diffInSeconds = (curTime-startTime)/(60*1000);

       if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds)

       {

            try {

                executeBatch();

                startTime = System.currentTimeMillis();

            }

            catch (SQLException e) {

                 e.printStackTrace();

            }

       }

   }

}



public void executeBatch() throws SQLException

{

    batchExecuted=true;

    prepStatement.executeBatch();

    counter = new AtomicInteger(0);

}

假定Spout和Bolt准备妥当(等待给实践),topology生成器将会确立topology并实施。下面就来拘禁一下行步骤。

于本土集群达运行与测试topology

  • 通过TopologyBuilder建立topology。
  • 接纳Storm
    Submitter,将topology递交给集群。以topology的名字、配置与topology的目的作为参数。
  • 提交topology。

列表 8:建立和实践topology

public class StormMain

{

     public static void main(String[] args) throws AlreadyAliveException, 

                                                   InvalidTopologyException, 

                                                   InterruptedException 

     {

          ParallelFileSpout parallelFileSpout = new ParallelFileSpout();

          ThresholdBolt thresholdBolt = new ThresholdBolt();

          DBWriterBolt dbWriterBolt = new DBWriterBolt();

          TopologyBuilder builder = new TopologyBuilder();

          builder.setSpout("spout", parallelFileSpout, 1);

          builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");

          builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");

          if(this.argsMain!=null && this.argsMain.length > 0) 

          {

              conf.setNumWorkers(1);

              StormSubmitter.submitTopology( 

                   this.argsMain[0], conf, builder.createTopology());

          }

          else

          {    

              Config conf = new Config();

              conf.setDebug(true);

              conf.setMaxTaskParallelism(3);

              LocalCluster cluster = new LocalCluster();

              cluster.submitTopology(

              "Threshold_Test", conf, builder.createTopology());

          }

     }

}

开创 topology 后,提交到本地集群。一旦topology被交给,除非叫 kill
或者因修改要关门大吉集群,否则它们将直运转。这吗是Storm一怪优势之一。

本例展示起与应用Storm,一旦您精通topology、spout和bolt这多少个基本概念,将会非凡轻。固然您处理好数据,又不惦记用
Hadoop,那么以 Storm 是一个相当好之挑选。

Storm常见问题解答


  • 平等、我发生一个数据文件,或者自身发生一个系内部有多少,怎么导入storm做总括?

您待贯彻一个Spout,Spout负责将数据emit到storm系统里,交给bolts总括。怎么落实spout可以参见官方的kestrel
spout实现:

https://github.com/nathanmarz/storm-kestrel

而你的数据源不协助事务性消费,那么尽管不可能拿到storm提供的笃定处理的承保,也一贯不必要实现ISpout接口中之ack和fail方法。

  • 老二、Storm为了保险tuple的可靠处理,需要保存tuple音信,这会无会晤招致内存OOM?

Storm为了确保tuple的保险处理,acker会保存该节点制造的tuple
id的xor值,这名ack value,那么每ack一糟,就用tuple id和ack
value做异或(xor)。当所有有的tuple都于ack的时节, ack
value一定为0。这是单非凡简短的策略,对于每一个tuple也只要占约20独字节的内存。对于100万tuple,也才20M左右。关于保险处理看那个:

https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

  • 老三、Storm总计后底结果保存于乌?可以保存在表面存储吗?

Storm不处理总结结果的保留,这是动代码需要负担的事务,假设数量未坏,你可以大概地保存在内存里,也堪每趟都更新数据库,也可行使NoSQL存储。storm并没如s4这样提供一个Persist
API,根据日要容量来开存储输出。这有的业务了交给用户。

数量存储之后的变现,也是你要协调处理的,storm
UI只供针对性topology的督察与总括。

  • 季、Storm怎么处理还的tuple?

因Storm要力保tuple的保险处理,当tuple处理失败或者逾期的当儿,spout会fail并更发送该tuple,那么尽管会面时有爆发tuple重复总结的问题。这些题材是雅不便化解之,storm也无供体制援救而解决。一些行之有效的国策:

(1)不处理,这吗总算种政策。因为实时总计日常并无求老高的精确度,后续的批判处理总括会另行正实时算的误差。

(2)使用第三正在集中储存来过滤,比如采纳mysql,memcached或者redis根据逻辑主键来去重。

(3)使用bloom filter做过滤,简单急速。

  • 五、Storm的动态增删节点

自在storm和s4里比里摆到之动态增删节点,是依storm可以动态地抬高和削减supervisor节点。对于滑坡节点吧,被移除的supervisor上的worker会被nimbus重新负载均衡到此外supervisor节点上。在storm
0.6.1先的本,扩充supervisor节点不会师影响现有的topology,也尽管是现有的topology不碰面再负载均衡到新的节点上,在扩大集群的时刻很不便于,需要再一次提交topology。因而我在storm的邮件列表里提了这一个题目,storm的开发者nathanmarz创设了一个issue
54连当0.6.1资了rebalance命令来深受在周转的topology重新负载均衡,具体表现:

https://github.com/nathanmarz/storm/issues/54

和0.6.1的变更:

http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246

storm并无提供体制来动态调整worker和task数目。

  • 六、Storm UI里spout总结的complete
    latency的切实可行意思是呀?为何emit的数目会是acked的点滴倍增?

此实际是storm邮件列表里之一个题材。Storm作者marz的解答:

The complete latency is the time  from the spout emitting a tuple to
that tuple being acked on the spout. So it tracks the time  for the
whole tuple tree to be processed.

If you dive into the spout component in the UI, you’ll see that a lot
of the emitted/transferred is on the __ack* stream.  This is the
spout communicating with the ackers which take care of tracking the
tuple trees.

简易地游说,complete
latency表示了tuple从emit到于acked经过的时间,能够看是tuple以及该tuple的后续子孙(形成一致株树)整个拍卖时。其次spout的emit和transfered还总结了spout和acker之间内部的通信音讯,比如对于保险处理的spout来说,会以emit的时节又发送一个_NoSQL,ack_init给acker,记录tuple
id到task id的映照,以便ack的时会找到正确的acker task。

另外开源的那一个数额解决方案


自 Google 在 2004 年产 MapReduce 范式以来,已出生了大多单下原始
MapReduce 范式(或富有该范式的身分)的缓解方案。Google 对 MapReduce
的首使用是建万维网的目。虽然是应用程序还是相当盛,但这些简单模型解决之题材啊正值增添。

表 1
供了一个可用开源大数据解决方案的列表,包括传统的批判处理和流式处理应用程序。在将
Storm 引入开源往日即一年的时刻里,Yahoo! 的 S4 分布式流统计平台早已向
Apache 开源。S4 于 2010 年 10 月宣布,它提供了一个胜性能统计 (HPC)
平台,向应用程序开发人士隐藏了并行处理的繁杂。S4
实现了一个然而扩展的、分散化的集群架构,并纳入了一部分容错效用。

表 1. 开源深数额解决方案

解决方案

开发商

类型

描述

Storm

Twitter

流式处理

Twitter 的新流式大数据分析解决方案

S4

Yahoo!

流式处理

来自 Yahoo! 的分布式流计算平台

Hadoop

Apache

批处理

MapReduce 范式的第一个开源实现

Spark

UC Berkeley AMPLab

批处理

支持内存中数据集和恢复能力的最新分析平台

Disco

Nokia

批处理

Nokia 的分布式 MapReduce 框架

HPCC

LexisNexis

批处理

HPC 大数据集群

参考资料


网站地图xml地图