Spark Streaming乱序案例

分析因调度器配置引起的数据同步乱序

Posted by Cheney.Yin on April 11, 2020

Spark Streaming RDD乱序

问题描述

Spark Streaming RDD乱序是指,在其开启公平调度的前提下,Spark Streaming周期性地提交Job,并且Job可并发执行。然而在严格的传输语义的要求下(如Exactly-Once),Spark Streaming需要向上游系统发送应答记录数据拉取进度,例如Kafka Consumer会向Kafka broker提交消息偏移位。但是,并发执行的多个Job其处理逻辑虽然相同,但是每个Job的执行时间是难以准确预测。

  • 当后继RDD的Job先于前驱RDD完成时,Kafka broker记录了后继RDD的偏移位,如果此刻流式应用出现故障,重启应用会从向Kafka最后提交的偏移位拉取数据,而完成的前继RDD Job则被忽略,造成数据缺失(当然,每个RDD先做checkpoint是可以解问题的)。
  • 当后继RDD的Job先于前驱RDD完成时,Kafka broker记录了后继RDD的偏移位,然后先驱RDD的Job也执行完毕,Kafka broker记录了先驱RDD的偏移位,那么Kafka broker会无视后继RDD完成处理的数据,认为最新的消费进度为先驱RDD提交的偏移位,如果此刻发生故障,应用重新从Kafka拉取数据,后继RDD完成处理过的数据会被再次消费,无法做到Exactly-Once

场景复现

如下代码可以复现该现象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
Map<String, Object> kafkaParams = new HashMap<>();
Set<String> topics = new HashSet<>();
String bootStrapServer = "192.168.32.61:9092";
String testTopic = "test-data";
String directory = "/home/yanchengyu/kafka-dump";
kafkaParams.put("bootstrap.servers", bootStrapServer);
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("group.id", "Kafka Debuger");
kafkaParams.put("enable.auto.commit", "false");
kafkaParams.put("session.timeout.ms", "90000");
kafkaParams.put("request.timeout.ms", "95000");
kafkaParams.put("fetch.max.wait.ms", "90000");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
topics.add(testTopic);

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Kafka Debuger");
conf.set("spark.scheduler.allocation.file", "/home/yanchengyu/spark/fair.xml");
conf.set("spark.scheduler.mode", "FAIR");
conf.set("spark.streaming.concurrentJobs", "4");
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
ssc.sparkContext().setLocalProperty("spark.scheduler.pool", "production");
JavaInputDStream<ConsumerRecord<String, String>> stream =  KafkaUtils.createDirectStream(ssc, 
        LocationStrategies.PreferConsistent(), 
        ConsumerStrategies.Subscribe(topics, kafkaParams));

LongAccumulator acc = ssc.sparkContext().sc().longAccumulator();
acc.setValue(3);

List<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition(testTopic, 0));

stream.foreachRDD(rdd -> {

    HasOffsetRanges hors = (HasOffsetRanges)(rdd.rdd());
    OffsetRange[] ranges = hors.offsetRanges();
    int rddId = rdd.id();
    rdd.foreachPartition(iter -> {
        int partitionId = TaskContext.getPartitionId();
        String partitionFile = String.format("%d_%d_%d-%d", 
                rddId, partitionId, ranges[partitionId].fromOffset(), ranges[partitionId].untilOffset());
        File dumpFile = new File(directory + "/" + partitionFile);
        if (dumpFile.exists()) {
            dumpFile.delete();
        } else {
            dumpFile.createNewFile();
        }

        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(dumpFile)));
        while(iter.hasNext()) {
            ConsumerRecord<String, String> record = iter.next();
            writer.write(record.key() + "\t" + record.value() + "\n");
        }
        writer.flush();
        writer.close();

    });

    if (rdd.id() > 3) {
        return ;
    }
    while(rddId != acc.value()) {;}

    CanCommitOffsets committer = (CanCommitOffsets)stream.inputDStream();
    committer.commitAsync(ranges, new OffsetCommitCallback() {

        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            // TODO Auto-generated method stub
            for (TopicPartition topicPartition : topicPartitions) {

                long offset = offsets.get(topicPartition).offset();
                String marker = String.format("%d_%d_%d_-%d", 
                        System.currentTimeMillis(), rddId, topicPartition.partition(), offset);
                File offsetMarker = new File(directory + "/" + marker);
                if (offsetMarker.exists()) {
                    offsetMarker.delete();
                } else {
                    try {
                        offsetMarker.createNewFile();
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }	
            }
            acc.add(-1);
        }
    });
});
ssc.start();
try {
    ssc.awaitTermination();
} catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

文件目录如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-rw-r--r--  1 root       root       648815 1月  11 10:32 0_0_442728-477283
-rw-r--r--  1 root       root       331816 1月  11 10:32 1_0_477283-494952
-rw-r--r--  1 root       root            0 1月  11 10:32 1547173970001_3_0_-547959
-rw-r--r--  1 root       root            0 1月  11 10:32 1547173975001_2_0_-530290
-rw-r--r--  1 root       root            0 1月  11 10:33 1547173980001_1_0_-494952
-rw-r--r--  1 root       root            0 1月  11 10:33 1547173985002_0_0_-477283
-rw-r--r--  1 root       root       663632 1月  11 10:32 2_0_494952-530290
-rw-r--r--  1 root       root       331816 1月  11 10:32 3_0_530290-547959
-rw-r--r--  1 root       root       663632 1月  11 10:32 4_0_547959-583297
-rw-r--r--  1 root       root            0 1月  11 10:32 5_0_583297-583297
-rw-r--r--  1 root       root            0 1月  11 10:32 6_0_583297-583297
-rw-r--r--  1 root       root            0 1月  11 10:33 7_0_583297-583297
-rw-r--r--  1 root       root            0 1月  11 10:33 8_0_583297-583297
-rw-r--r--  1 root       root            0 1月  11 10:33 9_0_583297-583297

目录包含两类文件,1_0_477283-494952这类为写的数据文件,1表示RDD的ID,0表示分区号,477283-494952表示对应Kafka数据偏移位范围为$ [477283,494952) $。1547173970001_3_0_-547959为已提交的消费进度信息,1547173970001为时间戳,3表示RDD的ID,0表示分区号, 547959提交的偏移位。

从目录可以看出,Kafka broker记录的偏移位依次来自RDD 3、RDD 2、RDD 1、RDD 0。如果发生故障,应用重启后,会再次消费$ [477283, 547959) $的数据。

解决方法

  • 按照RDD的生成顺序来提交偏移位,是可以解决Kafka broker偏移保持和实际消费场景(Story)不一致的现象。使用累加器保序是一种实现方式。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    
    JavaInputDStream<ConsumerRecord<String, String>> stream =  KafkaUtils.createDirectStream(ssc, 
            LocationStrategies.PreferConsistent(), 
            ConsumerStrategies.Subscribe(topics, kafkaParams));
      
    LongAccumulator acc = ssc.sparkContext().sc().longAccumulator();
    acc.setValue(0);
      
    List<TopicPartition> topicPartitions = new ArrayList<>();
    topicPartitions.add(new TopicPartition(testTopic, 0));
      
    stream.foreachRDD(rdd -> {
      
        HasOffsetRanges hors = (HasOffsetRanges)(rdd.rdd());
        OffsetRange[] ranges = hors.offsetRanges();
        int rddId = rdd.id();
        rdd.foreachPartition(iter -> {
            int partitionId = TaskContext.getPartitionId();
            String partitionFile = String.format("%d_%d_%d-%d", 
                    rddId, partitionId, ranges[partitionId].fromOffset(), ranges[partitionId].untilOffset());
            File dumpFile = new File(directory + "/" + partitionFile);
            if (dumpFile.exists()) {
                dumpFile.delete();
            } else {
                dumpFile.createNewFile();
            }
      
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(dumpFile)));
            while(iter.hasNext()) {
                ConsumerRecord<String, String> record = iter.next();
                writer.write(record.key() + "\t" + record.value() + "\n");
            }
            writer.flush();
            writer.close();
      
        });
        while(rddId != acc.value()) {;}
      
        CanCommitOffsets committer = (CanCommitOffsets)stream.inputDStream();
        committer.commitAsync(ranges, new OffsetCommitCallback() {
      
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                // TODO Auto-generated method stub
                for (TopicPartition topicPartition : topicPartitions) {
      
                    long offset = offsets.get(topicPartition).offset();
                    String marker = String.format("%d_%d_%d_-%d", 
                            System.currentTimeMillis(), rddId, topicPartition.partition(), offset);
                    File offsetMarker = new File(directory + "/" + marker);
                    if (offsetMarker.exists()) {
                        offsetMarker.delete();
                    } else {
                        try {
                            offsetMarker.createNewFile();
                        } catch (IOException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }	
                }
                acc.add(1);
            }
        });
    });
    ssc.start();
    try {
        ssc.awaitTermination();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    

    这种方法保证了并发量,但是,会因为等待提交偏移位而浪费计算资源,并且频繁地访问累加器会增加计算和网络传输开销。

  • 为流式应用配置FIFO调度器。这种做法虽然不会浪费计算资源,但并发量会受限制,处理速率有降低的可能。