Перейти к основному содержимому

Ingestion Layer — Слой приёма и обработки

Версия: 1.0
Дата: 19.04.2026
Статус: Черновик

Обзор

Ingestion Layer выполняет нормализацию разнородных данных в единый формат согласно разделу "### 2. Ingestion layer (Слой приёма и обработки)" в Архитектура платформы. Реализует ключевую задачу: "Один отель от трёх поставщиков должен стать одной записью в системе с объединёнными данными." Обрабатывает данные от Stuba (XML/FTP), HomeToGo (GraphQL) и других поставщиков из раздела "### 1. Suppliers (Поставщики данных)" в Архитектура платформы.

Архитектура обработки: См. диаграмму vitrip_ingestion_architecture.jpg — NATS → Kafka pipeline с Rust компонентами для парсинга и матчинга.

Архитектура Ingestion Layer vitrip.store

Поток данных: См. диаграмму vitrip_ingestion_data_flow.jpg — детальный поток от поставщиков до PostgreSQL схем из Storage Layer.

Поток данных Ingestion Layer vitrip.store

Алгоритм матчинга: См. диаграмму vitrip_hotel_matching.jpg — нечёткий поиск дубликатов отелей с confidence scores для hotels.supplier_mapping.

Алгоритм матчинга отелей vitrip.store

Архитектура компонентов

Message Queue Pipeline

Технологии: NATS → Kafka согласно технологическому стеку в Архитектура платформы: "Message queue | NATS → Kafka | Асинхронность, гарантированная доставка".

NATS (Входной слой)

// Структура для входящих сообщений от поставщиков
#[derive(Debug, Deserialize)]
pub struct SupplierMessage {
pub supplier_id: String, // "stuba", "homeogo", "ratehawk"
pub message_type: MessageType, // "hotel_data", "price_update", "availability"
pub payload: serde_json::Value, // Сырые данные поставщика
pub timestamp: chrono::DateTime<chrono::Utc>,
pub correlation_id: String, // Для трассировки
}

#[derive(Debug, Deserialize)]
pub enum MessageType {
HotelData, // Статические данные отелей
PriceUpdate, // Обновления цен
AvailabilityUpdate, // Доступность номеров
}

Kafka (Обработка и буферизация)

# Kafka Topics структура
topics:
# Raw data from suppliers
hotel-data-raw:
partitions: 12 # По supplier_id для параллелизма
replication: 3
retention: 7d # Исходные данные на неделю

# Normalized data
hotel-data-normalized:
partitions: 6
replication: 3
retention: 30d # Нормализованные данные на месяц

# Matched hotels
hotel-data-matched:
partitions: 3
replication: 3
retention: 90d # Матчинг результаты на квартал

# Price updates (high frequency)
price-updates:
partitions: 24 # Высокочастотные обновления
replication: 3
retention: 1d

# Dead letter queue
ingestion-dlq:
partitions: 3
replication: 3
retention: 30d # Ошибки обработки

Rust Data Processing

Технологии: Rust согласно технологическому стеку в Архитектура платформы: "Data processing | Rust | CPU-интенсивные задачи, обработка больших объёмов".

Parser Architecture

// Trait для унифицированного парсинга
pub trait SupplierParser: Send + Sync {
type Input;
type Output: NormalizedHotel;

fn parse(&self, input: Self::Input) -> Result<Vec<Self::Output>, ParseError>;
fn validate(&self, hotel: &Self::Output) -> Result<(), ValidationError>;
}

// Нормализованная структура отеля
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NormalizedHotel {
pub external_id: String,
pub supplier_id: String,
pub name: String,
pub address: Address,
pub coordinates: Option<GeoPoint>,
pub star_rating: Option<f32>,
pub amenities: Vec<String>,
pub rooms: Vec<Room>,
pub photos: Vec<Photo>,
pub description: Option<Description>,
pub chain_info: Option<ChainInfo>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Address {
pub street: Option<String>,
pub city: String,
pub region: Option<String>,
pub country: String,
pub postal_code: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GeoPoint {
pub latitude: f64,
pub longitude: f64,
}

Stuba XML Parser

Источник: XML/FTP массовые выгрузки согласно разделу "### 1. Suppliers (Поставщики данных)" в Архитектура платформы.

// Парсер для Stuba XML данных
pub struct StubaXmlParser {
xml_reader: quick_xml::Reader<BufReader<File>>,
validation_rules: ValidationRules,
}

impl SupplierParser for StubaXmlParser {
type Input = PathBuf; // Путь к XML файлу
type Output = NormalizedHotel;

fn parse(&self, xml_path: Self::Input) -> Result<Vec<Self::Output>, ParseError> {
let mut hotels = Vec::new();
let mut buf = Vec::new();

loop {
match self.xml_reader.read_event(&mut buf)? {
Event::Start(ref e) if e.name() == b"hotel" => {
let hotel = self.parse_hotel_element()?;
self.validate(&hotel)?;
hotels.push(hotel);
}
Event::Eof => break,
_ => (),
}
buf.clear();
}

Ok(hotels)
}

fn validate(&self, hotel: &Self::Output) -> Result<(), ValidationError> {
if hotel.name.trim().is_empty() {
return Err(ValidationError::MissingName);
}

if hotel.address.city.trim().is_empty() {
return Err(ValidationError::MissingCity);
}

if let Some(ref coords) = hotel.coordinates {
if coords.latitude.abs() > 90.0 || coords.longitude.abs() > 180.0 {
return Err(ValidationError::InvalidCoordinates);
}
}

Ok(())
}
}

// Специфичная обработка Stuba структур
impl StubaXmlParser {
fn parse_hotel_element(&self) -> Result<NormalizedHotel, ParseError> {
// Парсинг специфичных для Stuba элементов:
// <hotel id="..." name="...">
// <location>
// <coordinates lat="..." lng="..." />
// <address>...</address>
// </location>
// <amenities>
// <amenity code="..." />
// </amenities>
// </hotel>
}
}

HomeToGo GraphQL Parser

Источник: GraphQL API согласно разделу "### 1. Suppliers (Поставщики данных)" в Архитектура платформы.

// Парсер для HomeToGo GraphQL ответов
pub struct HomeToGoParser {
client: reqwest::Client,
api_key: String,
rate_limiter: RateLimiter,
}

#[derive(Debug, Deserialize)]
pub struct HomeToGoResponse {
pub data: HomeToGoData,
pub errors: Option<Vec<GraphQLError>>,
}

#[derive(Debug, Deserialize)]
pub struct HomeToGoData {
pub accommodations: Vec<HomeToGoAccommodation>,
}

impl SupplierParser for HomeToGoParser {
type Input = GraphQLQuery;
type Output = NormalizedHotel;

fn parse(&self, query: Self::Input) -> Result<Vec<Self::Output>, ParseError> {
let response: HomeToGoResponse = self.execute_graphql_query(query)?;

if let Some(errors) = response.errors {
return Err(ParseError::GraphQLErrors(errors));
}

let mut hotels = Vec::new();
for accommodation in response.data.accommodations {
let hotel = self.convert_to_normalized(accommodation)?;
self.validate(&hotel)?;
hotels.push(hotel);
}

Ok(hotels)
}
}

// GraphQL запросы для HomeToGo
const HOTELS_QUERY: &str = r#"
query GetHotels($region: String!, $limit: Int!) {
accommodations(region: $region, limit: $limit) {
id
name
description
location {
latitude
longitude
address
city
country
}
amenities
images
rating
}
}
"#;

Hotel Matching Engine

Ключевая задача: Реализация "Один отель от трёх поставщиков должен стать одной записью в системе с объединёнными данными" из Архитектура платформы.

Алгоритм матчинга

// Матчер отелей с использованием нечётких алгоритмов
pub struct HotelMatcher {
similarity_threshold: f32, // Минимальный порог схожести (0.8)
distance_threshold: f64, // Максимальное расстояние в метрах (100m)
name_weight: f32, // Вес названия в общем скоре (0.4)
location_weight: f32, // Вес локации в общем скоре (0.4)
address_weight: f32, // Вес адреса в общем скоре (0.2)
}

#[derive(Debug)]
pub struct MatchCandidate {
pub hotel_id: uuid::Uuid, // ID в hotels.properties
pub confidence: f32, // 0.0-1.0 confidence score
pub match_factors: MatchFactors,
}

#[derive(Debug)]
pub struct MatchFactors {
pub name_similarity: f32,
pub distance_meters: f64,
pub address_similarity: f32,
pub amenities_overlap: f32,
}

impl HotelMatcher {
pub fn find_matches(&self, candidate: &NormalizedHotel) -> Result<Vec<MatchCandidate>, MatchError> {
// 1. Geographical pre-filtering (в радиусе 5km)
let nearby_hotels = self.find_nearby_hotels(&candidate.coordinates, 5000.0)?;

// 2. Вычисление similarity scores для каждого кандидата
let mut matches = Vec::new();
for existing_hotel in nearby_hotels {
let match_factors = self.calculate_similarity(candidate, &existing_hotel);
let confidence = self.calculate_confidence_score(&match_factors);

if confidence >= self.similarity_threshold {
matches.push(MatchCandidate {
hotel_id: existing_hotel.id,
confidence,
match_factors,
});
}
}

// 3. Сортировка по убыванию confidence
matches.sort_by(|a, b| b.confidence.partial_cmp(&a.confidence).unwrap());

Ok(matches)
}

fn calculate_similarity(&self, candidate: &NormalizedHotel, existing: &ExistingHotel) -> MatchFactors {
// Similarity по названию (Jaro-Winkler algorithm)
let name_similarity = jaro_winkler(&candidate.name, &existing.name);

// Географическое расстояние
let distance_meters = if let (Some(c_coords), Some(e_coords)) = (&candidate.coordinates, &existing.coordinates) {
haversine_distance(c_coords, e_coords)
} else {
f64::INFINITY
};

// Similarity адреса
let address_similarity = jaro_winkler(&candidate.address.to_string(), &existing.address);

// Пересечение amenities
let amenities_overlap = self.calculate_amenities_overlap(&candidate.amenities, &existing.amenities);

MatchFactors {
name_similarity,
distance_meters,
address_similarity,
amenities_overlap,
}
}

fn calculate_confidence_score(&self, factors: &MatchFactors) -> f32 {
// Взвешенная сумма всех факторов
let mut score = 0.0;

// Название (вес 40%)
score += factors.name_similarity * self.name_weight;

// Локация (вес 40%)
let distance_score = if factors.distance_meters <= self.distance_threshold {
1.0 - (factors.distance_meters / self.distance_threshold) as f32
} else {
0.0
};
score += distance_score * self.location_weight;

// Адрес (вес 20%)
score += factors.address_similarity * self.address_weight;

// Бонус за совпадение amenities
score += factors.amenities_overlap * 0.1;

score.min(1.0) // Максимум 1.0
}
}

// Jaro-Winkler string similarity
fn jaro_winkler(s1: &str, s2: &str) -> f32 {
strsim::jaro_winkler(s1, s2) as f32
}

// Haversine formula для географических расстояний
fn haversine_distance(p1: &GeoPoint, p2: &GeoPoint) -> f64 {
const EARTH_RADIUS: f64 = 6371000.0; // метры

let lat1_rad = p1.latitude.to_radians();
let lat2_rad = p2.latitude.to_radians();
let delta_lat = (p2.latitude - p1.latitude).to_radians();
let delta_lon = (p2.longitude - p1.longitude).to_radians();

let a = (delta_lat / 2.0).sin().powi(2) +
lat1_rad.cos() * lat2_rad.cos() * (delta_lon / 2.0).sin().powi(2);
let c = 2.0 * a.sqrt().atan2((1.0 - a).sqrt());

EARTH_RADIUS * c
}

Обработка данных

Data Processing Pipeline

// Основной обработчик входящих сообщений
pub struct DataProcessor {
parsers: HashMap<String, Box<dyn SupplierParser>>,
matcher: HotelMatcher,
db_pool: sqlx::Pool<sqlx::Postgres>,
redis: redis::Client,
producer: FutureProducer,
}

impl DataProcessor {
pub async fn process_message(&self, message: SupplierMessage) -> Result<(), ProcessingError> {
match message.message_type {
MessageType::HotelData => self.process_hotel_data(message).await,
MessageType::PriceUpdate => self.process_price_update(message).await,
MessageType::AvailabilityUpdate => self.process_availability_update(message).await,
}
}

async fn process_hotel_data(&self, message: SupplierMessage) -> Result<(), ProcessingError> {
// 1. Получение соответствующего парсера
let parser = self.parsers.get(&message.supplier_id)
.ok_or(ProcessingError::UnknownSupplier)?;

// 2. Парсинг входных данных
let normalized_hotels = parser.parse(message.payload)?;

for hotel in normalized_hotels {
// 3. Поиск существующих матчей
let matches = self.matcher.find_matches(&hotel).await?;

if let Some(best_match) = matches.first() {
// 4a. Обновление существующего отеля
self.update_existing_hotel(&hotel, best_match).await?;

// 4b. Запись в supplier_mapping таблицу из reference/storage.md
self.create_supplier_mapping(&hotel, best_match.hotel_id, best_match.confidence).await?;
} else {
// 5a. Создание нового отеля в hotels.properties
let new_hotel_id = self.create_new_hotel(&hotel).await?;

// 5b. Запись в supplier_mapping
self.create_supplier_mapping(&hotel, new_hotel_id, 1.0).await?;
}
}

Ok(())
}

// Интеграция с hotels.properties из reference/storage.md
async fn create_new_hotel(&self, hotel: &NormalizedHotel) -> Result<uuid::Uuid, ProcessingError> {
let hotel_id = uuid::Uuid::new_v4();

sqlx::query!(
r#"
INSERT INTO hotels.properties (
id, external_id, supplier_id, name, address, coordinates,
star_rating, created_at, updated_at
) VALUES ($1, $2, (SELECT id FROM public.suppliers WHERE name = $3),
$4, $5, ST_MakePoint($6, $7), $8, now(), now())
"#,
hotel_id,
hotel.external_id,
hotel.supplier_id,
hotel.name,
serde_json::to_value(&hotel.address)?,
hotel.coordinates.as_ref().map(|c| c.longitude),
hotel.coordinates.as_ref().map(|c| c.latitude),
hotel.star_rating
).execute(&self.db_pool).await?;

Ok(hotel_id)
}

// Интеграция с hotels.supplier_mapping из reference/storage.md
async fn create_supplier_mapping(&self, hotel: &NormalizedHotel, hotel_id: uuid::Uuid, confidence: f32) -> Result<(), ProcessingError> {
sqlx::query!(
r#"
INSERT INTO hotels.supplier_mapping (
hotel_id, supplier_id, external_id, confidence, verified, created_at
) VALUES (
$1, (SELECT id FROM public.suppliers WHERE name = $2), $3, $4, $5, now()
)
ON CONFLICT (hotel_id, supplier_id)
DO UPDATE SET
external_id = EXCLUDED.external_id,
confidence = EXCLUDED.confidence,
verified = EXCLUDED.verified
"#,
hotel_id,
hotel.supplier_id,
hotel.external_id,
confidence,
confidence >= 0.95 // Автоматическая верификация для высокого confidence
).execute(&self.db_pool).await?;

Ok(())
}
}

Price Updates Processing

// Обработка частых обновлений цен
pub struct PriceUpdateProcessor {
redis: redis::Client,
db_pool: sqlx::Pool<sqlx::Postgres>,
}

impl PriceUpdateProcessor {
pub async fn process_price_update(&self, message: SupplierMessage) -> Result<(), ProcessingError> {
let price_data: PriceUpdateData = serde_json::from_value(message.payload)?;

// 1. Найти hotel_id по external_id и supplier_id
let hotel_id = self.resolve_hotel_id(&price_data.external_id, &message.supplier_id).await?;

// 2. Обновить Redis cache согласно naming convention из reference/storage.md
for room_price in price_data.room_prices {
let redis_key = format!("hotel:prices:{}:{}", hotel_id, price_data.date.format("%Y-%m-%d"));

let mut con = self.redis.get_async_connection().await?;
con.hset_multiple(&redis_key, &[
(&room_price.room_type, &room_price.price.to_string()),
("currency", &price_data.currency),
("updated_at", &chrono::Utc::now().to_rfc3339()),
]).await?;

// TTL 5-15 минут согласно reference/storage.md
con.expire(&redis_key, 900).await?;
}

// 3. Опционально: персистентное сохранение в PostgreSQL для аналитики
if price_data.persist {
self.save_price_history(&hotel_id, &price_data).await?;
}

Ok(())
}
}

Мониторинг и error handling

Metrics and Observability

// Prometheus метрики для мониторинга
pub struct IngestionMetrics {
pub messages_processed: prometheus::CounterVec,
pub processing_duration: prometheus::HistogramVec,
pub matching_confidence: prometheus::HistogramVec,
pub parse_errors: prometheus::CounterVec,
pub match_success_rate: prometheus::GaugeVec,
}

impl IngestionMetrics {
pub fn new() -> Self {
Self {
messages_processed: prometheus::register_counter_vec!(
"ingestion_messages_processed_total",
"Total number of processed messages",
&["supplier", "message_type", "status"]
).unwrap(),

processing_duration: prometheus::register_histogram_vec!(
"ingestion_processing_duration_seconds",
"Time spent processing messages",
&["supplier", "stage"],
prometheus::exponential_buckets(0.001, 2.0, 15).unwrap()
).unwrap(),

matching_confidence: prometheus::register_histogram_vec!(
"ingestion_matching_confidence",
"Hotel matching confidence scores",
&["supplier"],
prometheus::linear_buckets(0.0, 0.1, 10).unwrap()
).unwrap(),

parse_errors: prometheus::register_counter_vec!(
"ingestion_parse_errors_total",
"Number of parsing errors",
&["supplier", "error_type"]
).unwrap(),
}
}
}

Error Handling and Dead Letter Queue

// Обработка ошибок и DLQ
pub struct ErrorHandler {
dlq_producer: FutureProducer,
metrics: Arc<IngestionMetrics>,
max_retries: usize,
}

impl ErrorHandler {
pub async fn handle_processing_error(&self, message: SupplierMessage, error: ProcessingError) {
self.metrics.parse_errors
.with_label_values(&[&message.supplier_id, &error.error_type()])
.inc();

// Отправка в Dead Letter Queue для ручного анализа
let dlq_record = FutureRecord::to("ingestion-dlq")
.key(&message.correlation_id)
.payload(&serde_json::to_string(&DeadLetterMessage {
original_message: message,
error: error.to_string(),
timestamp: chrono::Utc::now(),
retry_count: self.max_retries,
}).unwrap());

if let Err(e) = self.dlq_producer.send(dlq_record, Timeout::After(Duration::from_secs(5))).await {
error!("Failed to send message to DLQ: {:?}", e);
}
}
}

#[derive(Debug, Serialize)]
pub struct DeadLetterMessage {
pub original_message: SupplierMessage,
pub error: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub retry_count: usize,
}

Performance Optimization

// Конфигурация производительности
#[derive(Debug, Clone)]
pub struct PerformanceConfig {
// Kafka consumer
pub consumer_batch_size: usize, // 1000 сообщений за раз
pub consumer_threads: usize, // 8 worker threads
pub consumer_timeout: Duration, // 5 секунд таймаут

// Database connection pool
pub db_pool_size: u32, // 20 соединений
pub db_idle_timeout: Duration, // 10 минут idle

// Redis connection pool
pub redis_pool_size: usize, // 50 соединений

// Processing
pub batch_insert_size: usize, // 100 отелей за транзакцию
pub matching_cache_ttl: Duration, // 1 час кеш матчинга
}

impl Default for PerformanceConfig {
fn default() -> Self {
Self {
consumer_batch_size: 1000,
consumer_threads: 8,
consumer_timeout: Duration::from_secs(5),
db_pool_size: 20,
db_idle_timeout: Duration::from_secs(600),
redis_pool_size: 50,
batch_insert_size: 100,
matching_cache_ttl: Duration::from_secs(3600),
}
}
}

Развёртывание и масштабирование

Kubernetes Deployment

# Ingestion Processor Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: ingestion-processor
spec:
replicas: 3
selector:
matchLabels:
app: ingestion-processor
template:
metadata:
labels:
app: ingestion-processor
spec:
containers:
- name: processor
image: vitrip/ingestion-processor:v1.0
env:
- name: KAFKA_BROKERS
value: "kafka-cluster:9092"
- name: POSTGRES_URL
valueFrom:
secretKeyRef:
name: postgres-credentials
key: url
- name: REDIS_URL
value: "redis:6379"
- name: RUST_LOG
value: "info"
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5

---
# HorizontalPodAutoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ingestion-processor-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ingestion-processor
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Pods
pods:
metric:
name: kafka_consumer_lag
target:
type: AverageValue
averageValue: "1000" # Максимум 1000 сообщений lag

Kafka Configuration

# Kafka Cluster для высокой пропускной способности
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: vitrip-kafka
spec:
kafka:
version: 3.4.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.4"
# Performance tuning
num.network.threads: 8
num.io.threads: 16
socket.send.buffer.bytes: 102400
socket.receive.buffer.bytes: 102400
socket.request.max.bytes: 104857600
num.partitions: 12
storage:
type: persistent-claim
size: 1Ti
class: fast-ssd
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 100Gi
class: fast-ssd

Связанная документация