나는 데이터 엔지니어 면접에 갔다. 면접관이 나에게 질문을했다. 그는 나에게 상황을 알려주고 해당 시스템의 데이터 흐름을 설계하도록 요청했다. 나는 그것을 해결했지만 그는 내 솔루션을 좋아하지 않아 실패했다. 난 당신이 그 도전을 해결하는 방법을 더 나은 아이디어가 있는지 알고 싶습니다.
문제는 :
Google 시스템은 4 가지 데이터 스트림을받습니다. 데이터에는 차량 ID, 속도 및 지리적 위치 조정이 포함됩니다. 모든 차량은 1 분에 한 번씩 데이터를 보냅니다. 특정 시내와 특정 도로 또는 차량 또는 다른 물체와의 연결은 없습니다. 조정을 수락하고 도로 섹션 이름을 반환하는 함수가 있습니다. 5 분당 도로 구간당 평균 속도를 알아야합니다. 마지막으로 결과를 Kafka에 작성하려고합니다.
그래서 내 해결책은 다음과 같습니다.
먼저 모든 데이터를 Kafka 클러스터에 하나의 주제로 작성하고 위도의 5-6 첫 번째 자릿수를 경도의 5-6 첫 번째 자릿수로 나눕니다. 그런 다음 Structured Streaming으로 데이터를 읽고 각 행에 대해 조정으로 도로 섹션 이름을 추가 한 다음 (도로 사전 정의 된 udf가 있음) 도로 섹션 이름별로 데이터를 정리합니다.
조정을 섹션 이름으로 변환 한 후 조정의 5-6 첫 자릿수로 Kafka의 데이터를 분할하기 때문에 조정을 섹션 이름으로 변환 한 후 많은 데이터를 올바른 파티션으로 전송할 필요가 없으므로 colesce () 조작을 활용할 수 있습니다 전체 셔플을 유발하지는 않습니다.
그런 다음 실행기 당 평균 속도를 계산합니다.
전체 프로세스는 5 분마다 발생하며 Append 모드에서 최종 Kafka 싱크에 데이터를 씁니다.
면접관은 내 솔루션을 좋아하지 않았습니다. 누구든지 그것을 개선하는 방법이나 완전히 다르고 더 나은 아이디어를 제안 할 수 있습니까?
답변
나는이 질문이 매우 흥미롭고 시도해 볼 생각을했다.
추가로 평가했듯이 다음을 제외하고는 시도 자체가 좋습니다.
위도의 5-6 첫 번째 숫자로 구분되고 경도의 5-6 첫 번째 숫자에 연결됨
위도와 경도를 기준으로 도로 섹션 ID / 이름을 가져 오는 방법이 이미있는 경우 먼저 해당 메소드를 호출하고 도로 섹션 id / name을 사용하여 데이터를 먼저 분할하십시오.
그 후 모든 것이 매우 쉬워서 토폴로지가
Merge all four streams ->
Select key as the road section id/name ->
Group the stream by Key ->
Use time windowed aggregation for the given time ->
Materialize it to a store.
(자세한 설명은 아래 코드의 주석에서 찾을 수 있습니다. 불분명 한 것이 있으면 문의하십시오)
이 답변의 끝에 코드를 추가했습니다. 평균 대신 시연하기 쉬운 합계를 사용했습니다. 추가 데이터를 저장하여 평균을 계산할 수 있습니다.
의견에 대한 답변을 자세히 설명했습니다. 다음은 코드에서 생성 된 토폴로지 다이어그램입니다 ( https://zz85.github.io/kafka-streams-viz/ 덕분에 )
토폴로지 :
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class VehicleStream {
// 5 minutes aggregation window
private static final long AGGREGATION_WINDOW = 5 * 50 * 1000L;
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
// Setting configs, change accordingly
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "vehicle.stream.app");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,kafka2:19092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// initializing a streambuilder for building topology.
final StreamsBuilder builder = new StreamsBuilder();
// Our initial 4 streams.
List<String> streamInputTopics = Arrays.asList(
"vehicle.stream1", "vehicle.stream2",
"vehicle.stream3", "vehicle.stream4"
);
/*
* Since there is no connection between a specific stream
* to a specific road or vehicle or anything else,
* we can take all four streams as a single stream
*/
KStream<String, String> source = builder.stream(streamInputTopics);
/*
* The initial key is unimportant (which can be ignored),
* Instead, we will be using the section name/id as key.
* Data will contain comma separated values in following format.
* VehicleId,Speed,Latitude,Longitude
*/
WindowBytesStoreSupplier windowSpeedStore = Stores.persistentWindowStore(
"windowSpeedStore",
AGGREGATION_WINDOW,
2, 10, true
);
source
.peek((k, v) -> printValues("Initial", k, v))
// First, we rekey the stream based on the road section.
.selectKey(VehicleStream::selectKeyAsRoadSection)
.peek((k, v) -> printValues("After rekey", k, v))
.groupByKey()
.windowedBy(TimeWindows.of(AGGREGATION_WINDOW))
.aggregate(
() -> "0.0", // Initialize
/*
* I'm using summing here for the aggregation as that's easier.
* It can be converted to average by storing extra details on number of records, etc..
*/
(k, v, previousSpeed) -> // Aggregator (summing speed)
String.valueOf(
Double.parseDouble(previousSpeed) +
VehicleSpeed.getVehicleSpeed(v).speed
),
Materialized.as(windowSpeedStore)
);
// generating the topology
final Topology topology = builder.build();
System.out.print(topology.describe());
// constructing a streams client with the properties and topology
final KafkaStreams streams = new KafkaStreams(topology, properties);
final CountDownLatch latch = new CountDownLatch(1);
// attaching shutdown handler
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
private static void printValues(String message, String key, Object value) {
System.out.printf("===%s=== key: %s value: %s%n", message, key, value.toString());
}
private static String selectKeyAsRoadSection(String key, String speedValue) {
// Would make more sense when it's the section id, rather than a name.
return coordinateToRoadSection(
VehicleSpeed.getVehicleSpeed(speedValue).latitude,
VehicleSpeed.getVehicleSpeed(speedValue).longitude
);
}
private static String coordinateToRoadSection(String latitude, String longitude) {
// Dummy function
return "Area 51";
}
public static class VehicleSpeed {
public String vehicleId;
public double speed;
public String latitude;
public String longitude;
public static VehicleSpeed getVehicleSpeed(String data) {
return new VehicleSpeed(data);
}
public VehicleSpeed(String data) {
String[] dataArray = data.split(",");
this.vehicleId = dataArray[0];
this.speed = Double.parseDouble(dataArray[1]);
this.latitude = dataArray[2];
this.longitude = dataArray[3];
}
@Override
public String toString() {
return String.format("veh: %s, speed: %f, latlong : %s,%s", vehicleId, speed, latitude, longitude);
}
}
}
답변
그와 같은 문제는 단순 해 보이며 제공된 솔루션은 이미 의미가 있습니다. 면접관이 귀하가 중점을 둔 솔루션의 설계 및 성능 또는 결과의 정확성에 대해 우려하고 있는지 궁금합니다. 다른 사람들은 코드, 디자인 및 성능에 중점을 두었으므로 정확성에 중점을 둘 것입니다.
스트리밍 솔루션
데이터가 흐르면서 도로의 평균 속도를 대략적으로 추정 할 수 있습니다. 이 추정은 정체를 감지하는 데 도움이되지만 속도 제한을 결정하는 데에는 도움이되지 않습니다.
- 4 개의 데이터 스트림을 모두 결합하십시오.
- 5 분의 창을 만들어 5 분 안에 4 개의 스트림 모두에서 데이터를 캡처하십시오.
- 거리 이름과 도시 이름을 얻으려면 좌표에 UDF를 적용하십시오. 거리 이름은 여러 도시에 중복되는 경우가 많으므로 city-name + street-name을 키로 사용합니다.
- 다음과 같은 구문으로 평균 속도를 계산하십시오.
vehicle_street_speed
.groupBy($"city_name_street_name")
.agg(
avg($"speed").as("avg_speed")
)
5. write the result to the Kafka Topic
배치 솔루션
표본 크기가 작기 때문에이 추정값은 해제됩니다. 속도 제한을보다 정확하게 결정하려면 월 / 분기 / 년 전체 데이터에 대한 일괄 처리가 필요합니다.
-
데이터 레이크 (또는 Kafka Topic)에서 연도 데이터 읽기
-
거리 이름과 도시 이름을 얻으려면 좌표에 UDF를 적용하십시오.
-
다음과 같은 구문으로 평균 속도를 계산하십시오.
vehicle_street_speed
.groupBy($"city_name_street_name")
.agg(
avg($"speed").as("avg_speed")
)
- 데이터 레이크에 결과를 씁니다.
이보다 정확한 속도 제한을 기반으로 스트리밍 애플리케이션에서 느린 트래픽을 예측할 수 있습니다.
답변
파티셔닝 전략에 몇 가지 문제가 있습니다.
-
위도 길이의 첫 5-6 자리를 기준으로 데이터를 분할한다고하면 사전에 카프카 파티션 수를 결정할 수 없습니다. 일부 도로 구간에서는 다른 구간보다 많은 양의 데이터를 볼 수 있습니다.
-
그리고 키 조합은 어쨌든 동일한 파티션에서 동일한 도로 섹션 데이터를 보장하지 않으므로 셔플 링이 발생하지 않을 것이라고 확신 할 수 없습니다.
정보가 주어진 IMO는 전체 데이터 파이프 라인을 설계하기에 충분하지 않습니다. 파이프 라인을 설계 할 때 데이터를 분할하는 방법이 중요한 역할을하기 때문입니다. 차량 수, 입력 데이터 스트림의 크기, 스트림 수가 고정되어 있거나 향후에 증가 할 수있는 것처럼 수신중인 데이터에 대해 더 문의해야합니다. 수신하는 입력 데이터 스트림이 kafka 스트림입니까? 5 분 동안 얼마나 많은 데이터를 받습니까?
- 이제 kafka 또는 4 개의 파티션에서 4 개의 주제에 4 개의 스트림을 작성하고 특정 키가 없지만 데이터가 일부 데이터 센터 키를 기준으로 파티션되거나 해시 파티션 된 것으로 가정합니다. 그렇지 않으면 다른 kafka 스트림에서 데이터를 중복 제거하고 파티셔닝하는 대신 데이터 측에서 수행해야합니다.
- 다른 데이터 센터에서 데이터를 수신하는 경우 데이터를 하나의 클러스터로 가져와야하며이를 위해 Kafka 미러 메이커 또는 이와 유사한 것을 사용할 수 있습니다.
- 하나의 클러스터에 모든 데이터를 보유한 후 요구 사항에 따라 5 분의 트리거 간격과 워터 마크를 사용하여 구조화 된 스트리밍 작업을 실행할 수 있습니다.
- 당신의 조합을 사용할 수 있습니다 셔플의 평균과 피할 많이 계산
mapValues
하고reduceByKey
대신 GROUPBY의를. 이것을 참조하십시오 . - 처리 후 kafka sink에 데이터를 쓸 수 있습니다.
답변
이 솔루션에서 볼 수있는 주요 문제는 다음과 같습니다.
- 지도의 6 자리 사각형의 가장자리에있는 도로 섹션에는 여러 주제 파티션의 데이터가 있으며 평균 속도가 여러 개입니다.
- Kafka 파티션의 수집 데이터 크기가 불균형 할 수 있습니다 (도시 대 사막). 자동차 ID 첫 자릿수로 분할하는 것이 좋은 아이디어 IMO 일 수 있습니다.
- 내가 합병 한 부분을 따랐다는 것은 확실하지 않지만 문제가있는 것 같습니다.
해결책은 Kafka 스트림에서 읽기-> UDF-> 그룹 별 도로 섹션-> 평균-> Kafka 스트림에 쓰기해야한다고 말하고 싶습니다.
답변
내 디자인은
- 도로 수
- 차량 수
- 좌표에서 도로의 계산 비용
개수에 관계없이 스케일을 조정하려는 경우 디자인은 다음과 같습니다.
이 디자인에 대한 교차 관심사-
- 입력 스트림의 내구성 상태 유지 (입력이 kafka 인 경우 Kafka와 함께 또는 외부에서 오프셋을 저장할 수 있음)
- 주기적으로 체크 포인트 상태를 외부 시스템으로 ( Flink에서 비동기 체크 포인트 장벽 사용을 선호합니다 )
이 디자인에서 가능한 실질적인 개선 사항-
- 가능한 경우 도로를 기반으로 도로 섹션 매핑 기능 캐싱
- 누락 된 핑 처리 (실제로 모든 핑을 사용할 수있는 것은 아님)
- 도로의 곡률 고려 (베어링 및 고도 고려)