Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeReportableSubtask;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
Expand Down Expand Up @@ -188,15 +189,17 @@ protected boolean executeOnce() throws Exception {
}
decreaseReferenceCountAndReleaseLastEvent(event, shouldReport);
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
LOGGER.info(
"Temporarily out of memory in pipe event processing, will wait for the memory to release.",
e);
PipeLogger.log(
LOGGER::info,
e,
"Temporarily out of memory in pipe event processing, will wait for the memory to release.");
return false;
} catch (final Exception e) {
if (ExceptionUtils.getRootCause(e) instanceof PipeRuntimeOutOfMemoryCriticalException) {
LOGGER.info(
"Temporarily out of memory in pipe event processing, will wait for the memory to release.",
e);
PipeLogger.log(
LOGGER::info,
e,
"Temporarily out of memory in pipe event processing, will wait for the memory to release.");
return false;
}
if (!isClosed.get()) {
Expand All @@ -210,7 +213,9 @@ protected boolean executeOnce() throws Exception {
ErrorHandlingUtils.getRootCause(e).getMessage()),
e);
} else {
LOGGER.info("Exception in pipe event processing, ignored because pipe is dropped.", e);
LOGGER.info(
"Exception in pipe event processing, ignored because pipe is dropped.{}",
e.getMessage() != null ? " Message: " + e.getMessage() : "");
clearReferenceCountAndReleaseLastEvent(event);
}
}
Expand Down Expand Up @@ -282,15 +287,6 @@ public int getRegionId() {
return regionId;
}

public int getEventCount(final boolean ignoreHeartbeat) {
// Avoid potential NPE in "getPipeName"
final EnrichedEvent event =
lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
return Objects.nonNull(event) && !(ignoreHeartbeat && event instanceof PipeHeartbeatEvent)
? 1
: 0;
}

//////////////////////////// Error report ////////////////////////////

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ protected boolean executeOnce() {
e);
} else {
LOGGER.info(
"Exception in pipe transfer, ignored because the connector subtask is dropped.{}",
"Exception in pipe transfer, ignored because the sink subtask is dropped.{}",
e.getMessage() != null ? " Message: " + e.getMessage() : "");
clearReferenceCountAndReleaseLastEvent(event);
}
Expand Down
Loading