Search before asking
Motivation
IoTDB 版本
问题
Pipe 已正常创建并启动,数据持续写入源端数据库。
但是新写入的数据不会立即通过 Pipe 同步到目标端,只有在手动执行:
之后,之前写入的数据才会被同步。
表现上看,Pipe 似乎依赖 TsFile Flush 之后才能感知到新增数据,而不是实时同步内存中的写入数据。
复现步骤
- 创建 Pipe
package com.xlkh.tsdb.iotdb.plugin.processor;
import com.xlkh.tsdb.iotdb.Globals;
import com.xlkh.tsdb.iotdb.util.TableRowUtil;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsoleProcessor implements PipeProcessor {
private static final Logger log = LoggerFactory.getLogger(ConsoleProcessor.class);
private static final ConcurrentSkipListMap<Long, Map<String, String>> payloads = new ConcurrentSkipListMap<>();
private static final AtomicInteger counter = new AtomicInteger(0);
@Override
public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
}
@Override
public void customize(PipeParameters pipeParameters, PipeProcessorRuntimeConfiguration pipeProcessorRuntimeConfiguration) throws Exception {
counter.set(Integer.parseInt(pipeParameters.getAttribute().getOrDefault("count", "200000")));
}
@Override
public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
throws Exception {
tabletInsertionEvent.processTablet(((tablet, collector) -> {
TableRowUtil.toMap(tablet).forEach((ts, fields) ->
payloads.merge(ts, fields, (oldVal, newVal) -> {
oldVal.putAll(newVal);
return oldVal;
}));
}));
Long timestamp = payloads.firstKey();
log.info("payloads size: {} - {}", timestamp, payloads.get(timestamp).size());
if (payloads.get(timestamp).size() >= counter.get()) {
Entry<Long, Map<String, String>> firstEntry = payloads.pollFirstEntry();
if (firstEntry != null) {
Globals.EXECUTOR.submit(() -> {
log.info("time: {} - {}", timestamp, System.currentTimeMillis());
log.info("payloads size: {}", payloads.size());
log.info("{}", Globals.gson.toJson(firstEntry.getValue()));
});
}
}
eventCollector.collect(tabletInsertionEvent);
}
@Override
public void process(Event event, EventCollector eventCollector) throws Exception {
eventCollector.collect(event);
}
@Override
public void close() throws Exception {
payloads.clear();
counter.set(200000);
}
}
- 启动 Pipe
DROP PIPE IF EXISTS console_pipe;
DROP PIPEPLUGIN IF EXISTS console_processor;
CREATE
PIPEPLUGIN console_processor AS 'com.xlkh.tsdb.iotdb.plugin.processor.ConsoleProcessor'
USING URI 'file:///home/iotdb/apache-iotdb-2.0.8-all-bin/ext/pipe/iotdb-plugin-ext.jar';
CREATE
PIPE console_pipe
WITH SOURCE (
'source' = 'iotdb-source',
'source.mode' = 'stream',
'source.pattern' = 'root.s1_5w',
'history.enable'='false',
'realtime.enable'='true',
)
WITH PROCESSOR (
'count' = '200000',
'processor' = 'console_processor',
)
WITH SINK (
'sink' = 'do-nothing-sink',
);
- 持续写入数据
INSERT INTO root.test.d1(time, s1) VALUES (NOW(), 190);
- 执行
发现之前未同步的数据立即开始同步。
补充信息
- Pipe 状态正常
- Pipe 未报错
- 问题可以稳定复现
- 已确认源端数据写入成功
- 希望确认这是配置问题还是 Pipe 的已知缺陷
Solution
No response
Alternatives
No response
Are you willing to submit a PR?
Search before asking
Motivation
IoTDB 版本
问题
Pipe 已正常创建并启动,数据持续写入源端数据库。
但是新写入的数据不会立即通过 Pipe 同步到目标端,只有在手动执行:
之后,之前写入的数据才会被同步。
表现上看,Pipe 似乎依赖 TsFile Flush 之后才能感知到新增数据,而不是实时同步内存中的写入数据。
复现步骤
发现之前未同步的数据立即开始同步。
补充信息
Solution
No response
Alternatives
No response
Are you willing to submit a PR?