[图片] Spark Streaming 实时统计数据(累加器的应用) 如果代码缺失导致无法运行,请留言标识,我会补全的️ 场景描述 从 kafka 中取实时数据,对数据进行清洗过滤,然后和当天的历史数据进行合并去重,对合并后的数据集进行汇总。将汇总结果写入 HBase,当时间到第二天的时候清除前一天历史数据,重新统计 ..

Spark Streaming 实时统计数据(累加器的应用)

Spark Streaming 实时统计数据(累加器的应用)

如果代码缺失导致无法运行,请留言标识,我会补全的❤️

场景描述

从 kafka 中取实时数据,对数据进行清洗过滤,然后和当天的历史数据进行合并去重,对合并后的数据集进行汇总。将汇总结果写入 HBase,当时间到第二天的时候清除前一天历史数据,重新统计。

实现逻辑

  1. 采用 Spark Streaming 读取 Kafka 中的实时数据流,生成 DStream
  2. 过滤其中的满足要求的数据,生成 DStream[k,v] (注:k 为数据唯一键, v 为详细数据信息)
  3. 采用 Spark Streaming 中 DStream[k,v]的 mapWithState 方法生成去重后的数据集
  4. 通过调用 StreamingContext 中的 awaitTerminationOrTimeout(time) 方法设置当前 StreamingContext 的终止时间实现在 24 时终止所有上述 DStream 计算。
  5. 调用 StreamingContext 中的 stop 方法,终止 StreamingContext。调用 stop 方法默认会终止 SparkContext,设置 stop(stopSparkContext:Boolean = false,stopGracefully:Boolean = true)参数,可以实现不终止 SparkContext,同时能够保持 StreamingContext 已经接受的 Batch 能够处理完成后再终止 StreamingContext

Java 代码

RealStatStreaming.java

import kafka.utils.ZKGroupTopicDirs;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import scala.Tuple2;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

/**
 * 用于实时统计数据。
 * @date 2019年9月3日17:57:38
 * @author Ludengke
 */
public final class RealStatStreaming {
    private static final String OFFSET_DIR = KAFKA_ROOT_PATH.concat(new ZKGroupTopicDirs(REALSTAT_GROUP_ID, CONSUMER_TOPIC_NAME).consumerOffsetDir());

    private static SparkSession sparkSession = null;
    private static JavaStreamingContext sc = null;

    public static void main(String[] args) throws Exception {

        SparkConf sparkConf = SparkFactory.getDefaultSparkConf()
                .set("spark.sql.shuffle.partitions","24")
                .setAppName("RealStatStreaming");

        sparkSession = SparkSession.builder()
                .config(sparkConf)
                .getOrCreate();


        // 根据 Spark配置生成 sc对象
        /**
         * 生成方式有2,如果CheckPoint有内容,则从上次CheckPoint启动
         * 如果没有则重新生成。代码重新编译之后,CheckPoint需要删除。
         */
        sc = JavaStreamingContext.getOrCreate(CHECK_POINT_DIR, (Function0<JavaStreamingContext>) () -> {
            sc = new JavaStreamingContext(JavaSparkContext.fromSparkContext(sparkSession.sparkContext()), Durations.seconds(REALSTAT_DURATIONS_SECOND));
            sc.checkpoint(CHECK_POINT_DIR);
            return sc;
        });

        // Kafka 相关配置
        Map<String, Object> kafkaParams = new HashMap<>(16);
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_QUORUM);
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, REALSTAT_GROUP_ID);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        RealStatStreaming.work(kafkaParams);


        sc.start();
        sc.awaitTerminationOrTimeout(getNeedRunTime());
        sc.stop();
    }

    /**
     * 计算实时统计任务需要运行的时长。
     * 明日0时 - 当前的时间
     * @return
     * @throws ParseException
     */
    private static long getNeedRunTime() throws ParseException {
        SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date now = new Date();
        String tomorrowMidnight = SystemUtils.getDateAddDays(sdfDate.format(now).substring(0,10),1)+ " 00:00:00";
        Date tomorrow = sdfDate.parse(tomorrowMidnight);
        return tomorrow.getTime()-now.getTime();
    }


    private static void work(Map<String, Object> kafkaParams) {

        // 根据 Kafka配置以及 sc对象生成 Streaming对象
        JavaInputDStream<ConsumerRecord<String, String>> stream = RealStatStreaming.getStreaming(sc,kafkaParams);

        // 取出kafka数据中的value
        JavaDStream<String> lines = stream.map(ConsumerRecord::value);
        /**
         * Format将数据转化成<key,bean>的形式,并且过滤
         * 使用mapWithState将历史数据和当前数据合并去重处理。
         * 调用stateSnapshots获取全部state的值;不调用的话仅仅包含本轮次的值。
         * statAndSave统计原始原始单,将结果保存到HBase
         *
         * PS: 如果常驻内存数据需要初始值的话,需要StateSpec.function(数据更新维护函数).initialState(初始化RDD)
         */
        SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd");
        String date = sdfDate.format(new Date());
        RealStatStreaming.statAndSave(RealStatStreaming.FormatData(lines,date).mapWithState(StateSpec.function(RealStatStreaming::mappingFunc)).stateSnapshots(),date);

        // 更新存储在 Zookeeper中的偏移量
        stream.foreachRDD(rdd -> {
            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
            for (OffsetRange o : offsetRanges) {
                ZookeeperFactory.getZkUtils().updatePersistentPath(
                        String.join(ZK_SPLIT, OFFSET_DIR, String.valueOf(o.partition())),
                        String.valueOf(o.fromOffset()),
                        ZookeeperFactory.getZkUtils().DefaultAcls()
                );
                SystemUtils.info("UPDATE OFFSET WITH [ topic :" + o.topic() + " partition :" + o.partition() + " offset :" + o.fromOffset() + " ~ " + o.untilOffset() + " ]");
            }
        });
    }

    /**
     * 根据StreamingContext以及Kafka配置生成DStream
     */
    private static JavaInputDStream<ConsumerRecord<String, String>> getStreaming(JavaStreamingContext context, Map<String, Object> kafkaParams) {
        // 获取偏移量存储路径下的偏移量节点
        if (!ZookeeperFactory.getZkClient().exists(OFFSET_DIR)) {
            ZookeeperFactory.getZkClient().createPersistent(OFFSET_DIR, true);
        }
        List<String> children = ZookeeperFactory.getZkClient().getChildren(OFFSET_DIR);

        if (children != null && !children.isEmpty()) {
            Map<TopicPartition, Long> fromOffsets = new HashMap<>(children.size());
            // 可以读取到存在Zookeeper中的偏移量 使用读取到的偏移量创建Streaming来读取Kafka
            for (String child : children) {
                long offset = Long.valueOf(ZookeeperFactory.getZkClient().readData(String.join(ZK_SPLIT, OFFSET_DIR, child)));
                fromOffsets.put(new TopicPartition(CONSUMER_TOPIC_NAME, Integer.valueOf(child)), offset);
                SystemUtils.info("FOUND OFFSET IN ZOOKEEPER, USE [ partition :" + child + " offset :" + offset + " ]");
            }
            SystemUtils.info("CREATE DIRECT STREAMING WITH CUSTOMIZED OFFSET..");
            return KafkaUtils.createDirectStream(
                    context,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Assign(new HashSet<>(fromOffsets.keySet()), kafkaParams, fromOffsets)
            );
        } else {
            // Zookeeper内没有存储偏移量 使用默认的偏移量创建Streaming
            SystemUtils.info("NO OFFSET FOUND, CREATE DIRECT STREAMING WITH DEFAULT OFFSET..");
            return KafkaUtils.createDirectStream(
                    context,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.Subscribe(Collections.singleton(CONSUMER_TOPIC_NAME), kafkaParams)
            );
        }
    }


    /**
     * 根据给出的时间过滤所需用统计的数据
     * @param lines 要处理的数据流
     * @param date 过滤条件时间字段
     * @return
     */
    private static JavaPairDStream<String,ApiTrade> FormatData (JavaDStream<String> lines,String date) {
        return lines.mapPartitionsToPair(RealStatStreaming::JsonToPairTradeBean).filter((Function<Tuple2<String, ApiTrade>, Boolean>) line->{
            if (line._2==null|| line._2.getD()==null || line._2.getE()==null){
                return false;
            }
            if (date.equals(line._2.getD().substring(0,10)) || date.equals(line._2.getE().substring(0,10))){
                return true;
            }else {
                return false;
            }
        });
    }


    /**
     * 对常驻内存的数据快照进行统计,将结果写入HBase
     * @param lines 常驻内存的数据快照
     * @param date HBase表的主键
     */
    private static void statAndSave(JavaPairDStream<String,ApiTrade> lines,String date) {
        lines.foreachRDD(tmp->{
            JavaRDD<ApiTrade> apiTrade = tmp.map((Function<Tuple2<String, ApiTrade>, ApiTrade>) v1 -> {
                return v1._2;
            });
            Dataset<ApiTrade> tradeData = sparkSession.createDataset(apiTrade.rdd(), Encoders.bean(ApiTrade.class));
            tradeData.createOrReplaceTempView("data");
            String selectSql = " count(1) count,sum(g) as money";
            String groupSql = " group by b";
            Dataset<Row> allStatData = sparkSession.sql(String.join(" ", "select ", "b", selectSql, " from data", groupSql));
            /**
             * 创建HBase表,里面包含表存在判断。
             */
            HBaseFactory.createTables("daily_total_stat",500);

            /**
             * 总量统计数据写入HBase
             */
            allStatData.rdd().toJavaRDD().foreach(line->{
                Put put = new Put(Bytes.toBytes(date));
                put.addColumn(
                        Bytes.toBytes(COLUMN_FAMILY),
                        Bytes.toBytes("count"),
                        Bytes.toBytes(line.getAs("count").toString()));
				put.addColumn(
                        Bytes.toBytes(COLUMN_FAMILY),
                        Bytes.toBytes("money"),
                        Bytes.toBytes(line.getAs("money").toString()));
                HBaseFactory.writeToHBase("daily_total_stat",put);
            });
        });
    }

    /**
     * 将某个分区内的Json数据转化成bean的形式
     * @param s 某分区数据迭代器
     * @return
     */
    private static Iterator JsonToTradeBean(Iterator<String> s){
        ArrayList<ApiTrade> tmp = new ArrayList<>();
        while (s.hasNext()) {
            ApiTrade apiTrade = SystemUtils.LOWER_CASE_WITH_UNDERSCORES_GSON.fromJson(s.next(), ApiTrade.class);
            tmp.add(apiTrade);
        }
        return tmp.iterator();
    }

    /**
     * 将某个分区内的Json数据转化成<key,bean>的形式
     * @param s 某分区数据迭代器
     * @return
     */
    private static Iterator<Tuple2<String,ApiTrade>> JsonToPairTradeBean(Iterator<String> s){
        ArrayList<Tuple2<String,ApiTrade>> tmp = new ArrayList<>();
        while (s.hasNext()) {
            ApiTrade apiTrade = SystemUtils.LOWER_CASE_WITH_UNDERSCORES_GSON.fromJson(s.next(), ApiTrade.class);
            tmp.add(new Tuple2<String,ApiTrade>(MD5Utils.encode(apiTrade.getA() + apiTrade.getB() + apiTrade.getC()), apiTrade));
        }
        return tmp.iterator();
    }


    /**
     * 根据key对应的当前数据和历史数据更新合并成新值(key所对应的value值)
     * @param key 历史数据的key
     * @param one key对应的当前数据
     * @param curState key对应的历史数据
     * @return
     */
    private static Tuple2<String,ApiTrade> mappingFunc (String key, Optional<ApiTrade> one, State<ApiTrade> curState){
        //判断one是否包含值
        if (one.isPresent()) {
            //取出当前批次的值
            ApiTrade oneTrade = one.get();
            //判断历史值是否存在,不存在直接新增,存在则判断是否更新
            if (curState.exists()) {
                //取出历史值,如果历史值为空或者当前值的修改时间大于历史值的修改时间,则更新数据为当前数据
                ApiTrade curStateTrade = curState.getOption().isEmpty()?null:curState.getOption().get();
                if(curStateTrade==null || oneTrade.getF().compareTo(curStateTrade.getF())>0){
                    curState.update(oneTrade);
                }
            } else {
                curState.update(oneTrade);
            }
        }
        return new Tuple2<>(key,curState.get());
    }
}

SystemUtils .java

import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;

/**
 * @author ludengke
 * @date 2019/9/11
 **/
public class SystemUtils {
    /**
     *   日期调整若干天
     */
    public static String getDateAddDays(String date ,Integer count) throws ParseException {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd" );
        Calendar cal = Calendar.getInstance();
        cal.setTime(format.parse(date));
        cal.add(Calendar.DATE, count);
        return format.format(cal.getTime());
    }

    public static final Gson DEFAULT_GSON = new GsonBuilder().create();
    public static final Gson LOWER_CASE_WITH_UNDERSCORES_GSON = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create();
}

SparkFactory.java

import org.apache.spark.SparkConf;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.sql.Row;

/**
 * @author ludengke
 * @date 2019/9/11
 **/
public class SparkFactory {
    /**
     * 所有任务公共配置
     *
     * @desc https://spark.apache.org/docs/latest/configuration.html
     */
    public static SparkConf getDefaultSparkConf() {
        return new SparkConf()
                .set("spark.shuffle.file.buffer", "1024k")
                .set("spark.reducer.maxSizeInFlight", "128m")
                .set("spark.shuffle.memoryFraction", "0.3")
                .set("spark.streaming.stopGracefullyOnShutdown", "true")
                .set("spark.streaming.kafka.maxRatePerPartition", "300")
                .set("spark.serializer", KryoSerializer.class.getCanonicalName())
                .registerKryoClasses(new Class[]{Row.class,Object.class,ApiTrade.class});
    }
}

GlobalConfig.java

/**
 * @author ludengke
 * @date 2019/9/11
 **/
public class GlobalConfig {
    /**
     * Kafka 配置在Zookeeper中的根路径
     */
    public static final String KAFKA_ROOT_PATH = "/kafka";
    /**
     * CheckPoint输出目录,在hdfs上。
     */
    public static final String CHECK_POINT_DIR = "/user/hdfs/RealStatStreamingCheckpoint";

    /**
     * 实时统计 消费者组id
     */
    public static final String REALSTAT_GROUP_ID = "realstat";

    /**
     * 实时统计 streaming 间隔
     */
    public static final long REALSTAT_DURATIONS_SECOND = 60L;

    /**
     * kafka连接
     */
    public static final String KAFKA_QUORUM = "kafka1:9092,kafka12:9092,kafka3:9092";

    /**
     * Kafka偏移量获取方式
     */
    public static final String AUTO_OFFSET_RESET = "earliest";

    /**
     * Zookeeper
     */
    public static final String ZK_SPLIT = "/";

    /**
     * zk连接
     */
    public static final String ZOOKEEPER_QUORUM = "kafka1:2181,kafka2:2181,kafka3:2181";

    /**
     * HBase 列簇名
     */
    public static final String COLUMN_FAMILY = "default";

}

MD5Utils.java

import java.security.MessageDigest;

/**
 * @author : LiuWeidong
 * @date : 2018/12/29.
 */

public class MD5Utils {

    private static final String[] HEX_DIG_ITS = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"};

    public static String encode(String origin) {
        String resultString = null;
        try {
            resultString = origin;
            MessageDigest md = MessageDigest.getInstance("MD5");
            resultString = byteArrayToHexString(md.digest(resultString.getBytes()));
        } catch (Exception ignored) {
        }
        return resultString;
    }


    private static String byteArrayToHexString(byte[] b) {
        StringBuilder resultSb = new StringBuilder();
        for (byte aB : b) {
            resultSb.append(byteToHexString(aB));
        }
        return resultSb.toString();
    }

    private static String byteToHexString(byte b) {
        int n = b;
        if (n < 0) {
            n += 256;
        }
        int d1 = n / 16;
        int d2 = n % 16;
        return HEX_DIG_ITS[d1] + HEX_DIG_ITS[d2];
    }
}

ApiTrade.java


import java.io.Serializable;

/**
 * @author ldk
 */
public class ApiTrade implements Serializable {

    private String a;
    private String b;
    private String c;
    private String d;
    private String e;
    private String f;
    private Integer g;

    public String getA() {
        return a;
    }

    public void setA(String a) {
        this.a = a;
    }

    public String getB() {
        return b;
    }

    public void setB(String b) {
        this.b = b;
    }

    public String getC() {
        return c;
    }

    public void setC(String c) {
        this.c = c;
    }

    public String getD() {
        return d;
    }

    public void setD(String d) {
        this.d = d;
    }

    public String getE() {
        return e;
    }

    public void setE(String e) {
        this.e = e;
    }

    public String getF() {
        return f;
    }

    public void setF(String f) {
        this.f = f;
    }

    public Integer getG() {
        return g;
    }

    public void setG(Integer g) {
        this.g = g;
    }
}

ZookeeperFactory.java


import com.wangdian.spark.tasks.system.GlobalConfig;
import com.wangdian.spark.tasks.utils.serializer.CustomSerializer;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

/**
 * @author : ldk
 * @date : 2019/1/24.
 */
public class ZookeeperFactory {

    private static final ZkConnection ZK_CONNECTION = new ZkConnection(GlobalConfig.ZOOKEEPER_QUORUM);
    private static final ZkClient ZK_CLIENT = new ZkClient(getZkConnection(), GlobalConfig.ZOOKEEPER_CONNECTION_TIMEOUT, new CustomSerializer());
    private static final ZkUtils ZK_UTILS = new ZkUtils(getZkClient(), getZkConnection(), false);

    public static ZkConnection getZkConnection() {
        return ZK_CONNECTION;
    }

    public static ZkClient getZkClient() {
        return ZK_CLIENT;
    }

    public static ZkUtils getZkUtils() {
        return ZK_UTILS;
    }

    private ZookeeperFactory() {
    }
}

HBaseFactory.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

/**
 * @author : ldk
 * @date : 2019/1/22.
 */
public class HBaseFactory {

    private static Connection conn = null;
    private static Configuration conf = null;

    public static Configuration getHBaseConf() {
        if (conf == null) {
            conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", GlobalConfig.ZOOKEEPER_QUORUM);
            conf.set("zookeeper.znode.parent", GlobalConfig.HBASE_ZNODE_PARENT);
        }
        return conf;
    }

    public static Connection createHBaseConn() {
        if (conn == null || conn.isClosed()) {
            try {
                conn = ConnectionFactory.createConnection(getHBaseConf());
            } catch (IOException e) {
                SystemUtils.error("创建HBase连接异常 : ", e);
            }
        }
        return conn;
    }

    public static synchronized void createTables(String tableName,Integer version) {
        try (Admin admin = HBaseFactory.createHBaseConn().getAdmin()) {
            if (admin.tableExists(TableName.valueOf(tableName))) {
                return;
            }
            TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
            builder.setColumnFamily(
                    ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(COLUMN_FAMILY))
                            /**
                             * 启用压缩
                             */
                            .setCompressionType(Compression.Algorithm.SNAPPY)
                            /**
                             * 设置最大存储版本号
                             */
                            .setMaxVersions(version)
                            .build()
            );
            if (!admin.tableExists(TableName.valueOf(tableName))) {
                admin.createTable(builder.build());
            }
        } catch (Exception e) {
            SystemUtils.error("创建HBase表结构异常: " + tableName, e);
        }
    }

    public static boolean isTableExist(String tableName) {
        try (Admin admin = HBaseFactory.createHBaseConn().getAdmin()) {
            return admin.tableExists(TableName.valueOf(tableName));
        } catch (Exception e) {
            SystemUtils.error("判断HBase表状态异常: " + tableName, e);
        }
        return false;
    }

    public static void writeToHBase(String tableName, Put put) {
        Table table = null;
        try {
            table = HBaseFactory.createHBaseConn().getTable(TableName.valueOf(tableName));
            table.put(put);
        } catch (Exception e) {
            SystemUtils.error("HBase数据写入异常 TABLE NAME :" + "[ " + tableName + " ]", e);
        }
        finally {
            try {
                if(table!=null){
                    table.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void writeToHBase(String tableName, List<Put> puts) {
        Table table = null;
        try {
            table = HBaseFactory.createHBaseConn().getTable(TableName.valueOf(tableName));
            table.put(puts);
        } catch (Exception e) {
            SystemUtils.error("HBase数据写入异常 TABLE NAME :" + "[ " + tableName + " ]", e);
        }
        finally {
            try {
                if(table!=null){
                    table.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

运行截图

20190911182414.png

PS:可以忽略最后一个 Job 的运行时间,这个是异常 Streaming 的截图,正常的我没截到。
每十个批次合并一次内存持久 RDD。一般任务是两个 job,第十个批次是三个 job,有一个 job 是用于合并内存持久化 RDD 的。

感兴趣的人可以去了解下为什么任务中会有 skipped 的任务。

总结

这个任务对集群有一定的要求,是把统计的数据放在了内存中,计算快,但是需要的内存量大。如果把统计结果放在内存中,会相对较小一些,因为业务的特殊要求,将 mapWithState 方法当成去重函来使用了。
下面这个例子是将统计结果放到内存中了。Spark Streaming 实时统计商户当日累计PV流量

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    71 引用 • 45 回帖 • 565 关注
  • mapWithState
    1 引用
  • 累加器
    1 引用
回帖
请输入回帖内容...