[apache-spark] 도로의 평균 속도 계산 [닫힘]

나는 데이터 엔지니어 면접에 갔다. 면접관이 나에게 질문을했다. 그는 나에게 상황을 알려주고 해당 시스템의 데이터 흐름을 설계하도록 요청했다. 나는 그것을 해결했지만 그는 내 솔루션을 좋아하지 않아 실패했다. 난 당신이 그 도전을 해결하는 방법을 더 나은 아이디어가 있는지 알고 싶습니다.

문제는 :

Google 시스템은 4 가지 데이터 스트림을받습니다. 데이터에는 차량 ID, 속도 및 지리적 위치 조정이 포함됩니다. 모든 차량은 1 분에 한 번씩 데이터를 보냅니다. 특정 시내와 특정 도로 또는 차량 또는 다른 물체와의 연결은 없습니다. 조정을 수락하고 도로 섹션 이름을 반환하는 함수가 있습니다. 5 분당 도로 구간당 평균 속도를 알아야합니다. 마지막으로 결과를 Kafka에 작성하려고합니다.

여기에 이미지 설명을 입력하십시오

그래서 내 해결책은 다음과 같습니다.

먼저 모든 데이터를 Kafka 클러스터에 하나의 주제로 작성하고 위도의 5-6 첫 번째 자릿수를 경도의 5-6 첫 번째 자릿수로 나눕니다. 그런 다음 Structured Streaming으로 데이터를 읽고 각 행에 대해 조정으로 도로 섹션 이름을 추가 한 다음 (도로 사전 정의 된 udf가 있음) 도로 섹션 이름별로 데이터를 정리합니다.

조정을 섹션 이름으로 변환 한 후 조정의 5-6 첫 자릿수로 Kafka의 데이터를 분할하기 때문에 조정을 섹션 이름으로 변환 한 후 많은 데이터를 올바른 파티션으로 전송할 필요가 없으므로 colesce () 조작을 활용할 수 있습니다 전체 셔플을 유발하지는 않습니다.

그런 다음 실행기 당 평균 속도를 계산합니다.

전체 프로세스는 5 분마다 발생하며 Append 모드에서 최종 Kafka 싱크에 데이터를 씁니다.

여기에 이미지 설명을 입력하십시오

면접관은 내 솔루션을 좋아하지 않았습니다. 누구든지 그것을 개선하는 방법이나 완전히 다르고 더 나은 아이디어를 제안 할 수 있습니까?



답변

나는이 질문이 매우 흥미롭고 시도해 볼 생각을했다.

추가로 평가했듯이 다음을 제외하고는 시도 자체가 좋습니다.

위도의 5-6 첫 번째 숫자로 구분되고 경도의 5-6 첫 번째 숫자에 연결됨

위도와 경도를 기준으로 도로 섹션 ID / 이름을 가져 오는 방법이 이미있는 경우 먼저 해당 메소드를 호출하고 도로 섹션 id / name을 사용하여 데이터를 먼저 분할하십시오.

그 후 모든 것이 매우 쉬워서 토폴로지가

Merge all four streams ->
Select key as the road section id/name ->
Group the stream by Key ->
Use time windowed aggregation for the given time ->
Materialize it to a store.

(자세한 설명은 아래 코드의 주석에서 찾을 수 있습니다. 불분명 한 것이 있으면 문의하십시오)

이 답변의 끝에 코드를 추가했습니다. 평균 대신 시연하기 쉬운 합계를 사용했습니다. 추가 데이터를 저장하여 평균을 계산할 수 있습니다.

의견에 대한 답변을 자세히 설명했습니다. 다음은 코드에서 생성 된 토폴로지 다이어그램입니다 ( https://zz85.github.io/kafka-streams-viz/ 덕분에 )

토폴로지 :

토폴로지 다이어그램

    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Materialized;
    import org.apache.kafka.streams.kstream.TimeWindows;
    import org.apache.kafka.streams.state.Stores;
    import org.apache.kafka.streams.state.WindowBytesStoreSupplier;

    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;

    public class VehicleStream {
        // 5 minutes aggregation window
        private static final long AGGREGATION_WINDOW = 5 * 50 * 1000L;

        public static void main(String[] args) throws Exception {
            Properties properties = new Properties();

            // Setting configs, change accordingly
            properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "vehicle.stream.app");
            properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,kafka2:19092");
            properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            // initializing  a streambuilder for building topology.
            final StreamsBuilder builder = new StreamsBuilder();

            // Our initial 4 streams.
            List<String> streamInputTopics = Arrays.asList(
                    "vehicle.stream1", "vehicle.stream2",
                    "vehicle.stream3", "vehicle.stream4"
            );
            /*
             * Since there is no connection between a specific stream
             * to a specific road or vehicle or anything else,
             * we can take all four streams as a single stream
             */
            KStream<String, String> source = builder.stream(streamInputTopics);

            /*
             * The initial key is unimportant (which can be ignored),
             * Instead, we will be using the section name/id as key.
             * Data will contain comma separated values in following format.
             * VehicleId,Speed,Latitude,Longitude
             */
            WindowBytesStoreSupplier windowSpeedStore = Stores.persistentWindowStore(
                    "windowSpeedStore",
                    AGGREGATION_WINDOW,
                    2, 10, true
            );
            source
                    .peek((k, v) -> printValues("Initial", k, v))
                    // First, we rekey the stream based on the road section.
                    .selectKey(VehicleStream::selectKeyAsRoadSection)
                    .peek((k, v) -> printValues("After rekey", k, v))
                    .groupByKey()
                    .windowedBy(TimeWindows.of(AGGREGATION_WINDOW))
                    .aggregate(
                            () -> "0.0", // Initialize
                            /*
                             * I'm using summing here for the aggregation as that's easier.
                             * It can be converted to average by storing extra details on number of records, etc..
                             */
                            (k, v, previousSpeed) ->  // Aggregator (summing speed)
                                    String.valueOf(
                                            Double.parseDouble(previousSpeed) +
                                                    VehicleSpeed.getVehicleSpeed(v).speed
                                    ),
                            Materialized.as(windowSpeedStore)
                    );
            // generating the topology
            final Topology topology = builder.build();
            System.out.print(topology.describe());

            // constructing a streams client with the properties and topology
            final KafkaStreams streams = new KafkaStreams(topology, properties);
            final CountDownLatch latch = new CountDownLatch(1);

            // attaching shutdown handler
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }


        private static void printValues(String message, String key, Object value) {
            System.out.printf("===%s=== key: %s value: %s%n", message, key, value.toString());
        }

        private static String selectKeyAsRoadSection(String key, String speedValue) {
            // Would make more sense when it's the section id, rather than a name.
            return coordinateToRoadSection(
                    VehicleSpeed.getVehicleSpeed(speedValue).latitude,
                    VehicleSpeed.getVehicleSpeed(speedValue).longitude
            );
        }

        private static String coordinateToRoadSection(String latitude, String longitude) {
            // Dummy function
            return "Area 51";
        }

        public static class VehicleSpeed {
            public String vehicleId;
            public double speed;
            public String latitude;
            public String longitude;

            public static VehicleSpeed getVehicleSpeed(String data) {
                return new VehicleSpeed(data);
            }

            public VehicleSpeed(String data) {
                String[] dataArray = data.split(",");
                this.vehicleId = dataArray[0];
                this.speed = Double.parseDouble(dataArray[1]);
                this.latitude = dataArray[2];
                this.longitude = dataArray[3];
            }

            @Override
            public String toString() {
                return String.format("veh: %s, speed: %f, latlong : %s,%s", vehicleId, speed, latitude, longitude);
            }
        }
    }


답변

그와 같은 문제는 단순 해 보이며 제공된 솔루션은 이미 의미가 있습니다. 면접관이 귀하가 중점을 둔 솔루션의 설계 및 성능 또는 결과의 정확성에 대해 우려하고 있는지 궁금합니다. 다른 사람들은 코드, 디자인 및 성능에 중점을 두었으므로 정확성에 중점을 둘 것입니다.

스트리밍 솔루션

데이터가 흐르면서 도로의 평균 속도를 대략적으로 추정 할 수 있습니다. 이 추정은 정체를 감지하는 데 도움이되지만 속도 제한을 결정하는 데에는 도움이되지 않습니다.

  1. 4 개의 데이터 스트림을 모두 결합하십시오.
  2. 5 분의 창을 만들어 5 분 안에 4 개의 스트림 모두에서 데이터를 캡처하십시오.
  3. 거리 이름과 도시 이름을 얻으려면 좌표에 UDF를 적용하십시오. 거리 이름은 여러 도시에 중복되는 경우가 많으므로 city-name + street-name을 키로 사용합니다.
  4. 다음과 같은 구문으로 평균 속도를 계산하십시오.

    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

5. write the result to the Kafka Topic

배치 솔루션

표본 크기가 작기 때문에이 추정값은 해제됩니다. 속도 제한을보다 정확하게 결정하려면 월 / 분기 / 년 전체 데이터에 대한 일괄 처리가 필요합니다.

  1. 데이터 레이크 (또는 Kafka Topic)에서 연도 데이터 읽기

  2. 거리 이름과 도시 이름을 얻으려면 좌표에 UDF를 적용하십시오.

  3. 다음과 같은 구문으로 평균 속도를 계산하십시오.


    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

  1. 데이터 레이크에 결과를 씁니다.

이보다 정확한 속도 제한을 기반으로 스트리밍 애플리케이션에서 느린 트래픽을 예측할 수 있습니다.


답변

파티셔닝 전략에 몇 가지 문제가 있습니다.

  • 위도 길이의 첫 5-6 자리를 기준으로 데이터를 분할한다고하면 사전에 카프카 파티션 수를 결정할 수 없습니다. 일부 도로 구간에서는 다른 구간보다 많은 양의 데이터를 볼 수 있습니다.

  • 그리고 키 조합은 어쨌든 동일한 파티션에서 동일한 도로 섹션 데이터를 보장하지 않으므로 셔플 링이 발생하지 않을 것이라고 확신 할 수 없습니다.

정보가 주어진 IMO는 전체 데이터 파이프 라인을 설계하기에 충분하지 않습니다. 파이프 라인을 설계 할 때 데이터를 분할하는 방법이 중요한 역할을하기 때문입니다. 차량 수, 입력 데이터 스트림의 크기, 스트림 수가 고정되어 있거나 향후에 증가 할 수있는 것처럼 수신중인 데이터에 대해 더 문의해야합니다. 수신하는 입력 데이터 스트림이 kafka 스트림입니까? 5 분 동안 얼마나 많은 데이터를 받습니까?

  • 이제 kafka 또는 4 개의 파티션에서 4 개의 주제에 4 개의 스트림을 작성하고 특정 키가 없지만 데이터가 일부 데이터 센터 키를 기준으로 파티션되거나 해시 파티션 된 것으로 가정합니다. 그렇지 않으면 다른 kafka 스트림에서 데이터를 중복 제거하고 파티셔닝하는 대신 데이터 측에서 수행해야합니다.
  • 다른 데이터 센터에서 데이터를 수신하는 경우 데이터를 하나의 클러스터로 가져와야하며이를 위해 Kafka 미러 메이커 또는 이와 유사한 것을 사용할 수 있습니다.
  • 하나의 클러스터에 모든 데이터를 보유한 후 요구 사항에 따라 5 분의 트리거 간격과 워터 마크를 사용하여 구조화 된 스트리밍 작업을 실행할 수 있습니다.
  • 당신의 조합을 사용할 수 있습니다 셔플의 평균과 피할 많이 계산 mapValues하고 reduceByKey대신 GROUPBY의를. 이것을 참조하십시오 .
  • 처리 후 kafka sink에 데이터를 쓸 수 있습니다.

답변

이 솔루션에서 볼 수있는 주요 문제는 다음과 같습니다.

  • 지도의 6 자리 사각형의 가장자리에있는 도로 섹션에는 여러 주제 파티션의 데이터가 있으며 평균 속도가 여러 개입니다.
  • Kafka 파티션의 수집 데이터 크기가 불균형 할 수 있습니다 (도시 대 사막). 자동차 ID 첫 자릿수로 분할하는 것이 좋은 아이디어 IMO ​​일 수 있습니다.
  • 내가 합병 한 부분을 따랐다는 것은 확실하지 않지만 문제가있는 것 같습니다.

해결책은 Kafka 스트림에서 읽기-> UDF-> 그룹 별 도로 섹션-> 평균-> Kafka 스트림에 쓰기해야한다고 말하고 싶습니다.


답변

내 디자인은

  1. 도로 수
  2. 차량 수
  3. 좌표에서 도로의 계산 비용

개수에 관계없이 스케일을 조정하려는 경우 디자인은 다음과 같습니다.
여기에 이미지 설명을 입력하십시오

이 디자인에 대한 교차 관심사-

  1. 입력 스트림의 내구성 상태 유지 (입력이 kafka 인 경우 Kafka와 함께 또는 외부에서 오프셋을 저장할 수 있음)
  2. 주기적으로 체크 포인트 상태를 외부 시스템으로 ( Flink에서 비동기 체크 포인트 장벽 사용을 선호합니다 )

이 디자인에서 가능한 실질적인 개선 사항-

  1. 가능한 경우 도로를 기반으로 도로 섹션 매핑 기능 캐싱
  2. 누락 된 핑 처리 (실제로 모든 핑을 사용할 수있는 것은 아님)
  3. 도로의 곡률 고려 (베어링 및 고도 고려)

답변