搜索
您的当前位置:首页正文

动手实战联合使用Spark Streaming、Broadcast、Accumulator计数器实现在线黑名单过滤和计数

来源:步旅网

本博文主要包括:
1、Spark Streaming与Broadcast、Accumulator联合
2、在线黑名单过滤和计数实战

一、Spark Streaming与Broadcast、Accumulator联合:

在企业实战中,广播本身广播到集群的时候,联合上计数器的话就有很大杀伤力,这时候你可以自定义,例如自定义广播中的内容,可以实现非常复杂的内容。

之所以广播和计数器特别重要,一方面鉴于广播和计数器本身的特性,另一方面广播和计数器可以说实现非常复杂的操作。在线黑名单过滤实战中,将黑名单放在广播中,有Accumulator你可以计数黑名单。

二、在线黑名单过滤和计数实战:

1、代码如下:

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/**
 * Created by zpf on 2016/8/31.
 */
public class SparkStreamingBroadcastAccumulator {

    private static volatile Broadcast<List<String>> broadcastList = null;
    private static volatile Accumulator<Integer> accumulator = null;

    public void main(String[] args) throws InterruptedException {

        SparkConf conf = new SparkConf().setMaster("local[2]").
                setAppName("SparkStreamingBroadcastAccumulator");


        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(15));

        //实例化我们的broadcast,使用Broadcast广播黑名单到每个Executor中
        broadcastList = jsc.sparkContext().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));
      /*全局计数器,用于统计在线过滤多少黑名单
      * */
        accumulator = jsc.sparkContext().accumulator(0,"OnlineBlacklistCount");

        JavaReceiverInputDStream lines = jsc.socketTextStream("Master",9999);

        JavaPairDStream<String,Integer> pairs = lines.mapToPair(new PairFunction<String,String,Integer>() {

            public Tuple2<String,Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word,1);
            }
        });

        JavaPairDStream<String,Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });

        /*过滤黑明单我们一般把内容写在foreach中*/

        wordsCount.foreachRDD(new Function2<JavaPairRDD<String,Integer>, Time, Void>(){

            public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
                rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
                    public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
                        if(broadcastList.value().contains(wordPair._1)){
                            accumulator.add(wordPair._2);
                            return false;

                        }else{
                            return true;
                        }
                    }



                }).collect();

//                System.out.println(broadcastList.value().toString() + ":" + accumulator.value());
                System.out.println("BlackList append : " + ":" + accumulator.value() + "times");
                return null;


            }
        });



        jsc.start();

        jsc.awaitTermination();

        jsc.close();
    }

}

2、在命令端输入数据

nc -lk 9999

3、观察结果

因篇幅问题不能全部显示,请点此查看更多更全内容

Top