Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Don't start a redundant UI interaction transaction when a transaction is already bound to the Scope ([#5491](https://github.com/getsentry/sentry-java/issues/5491))
- Previously, `SentryGestureListener` always started a UI transaction and only afterwards skipped binding it to the Scope when a manually-bound transaction already existed, leaving the new transaction to be dropped as an idle transaction without children.
- Fix potential NPE within `Scope.endSession()` ([#5657](https://github.com/getsentry/sentry-java/pull/5657))
- Fix memory leak in `ReplayIntegration` due to persisting executor not being shut down ([#5627](https://github.com/getsentry/sentry-java/pull/5627))

### Performance

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public class ReplayIntegration(
val delegate = Executors.newSingleThreadScheduledExecutor(ReplayExecutorServiceThreadFactory())
ReplayExecutorService(delegate, options)
}
private val persistingExecutor by lazy {
val delegate =
Executors.newSingleThreadScheduledExecutor(ReplayPersistingExecutorServiceThreadFactory())
ReplayExecutorService(delegate, options)
}

internal val isEnabled = AtomicBoolean(false)
internal val isManualPause = AtomicBoolean(false)
Expand Down Expand Up @@ -192,6 +197,7 @@ public class ReplayIntegration(
scopes,
dateProvider,
replayExecutor,
persistingExecutor,
replayCacheProvider,
)
} else {
Expand All @@ -201,6 +207,7 @@ public class ReplayIntegration(
dateProvider,
random,
replayExecutor,
persistingExecutor,
replayCacheProvider,
)
}
Expand Down Expand Up @@ -373,7 +380,8 @@ public class ReplayIntegration(
recorder?.close()
recorder = null
rootViewsSpy.close()
replayExecutor.shutdown()
replayExecutor.shutdownNow()
persistingExecutor.shutdownNow()
lifecycle.currentState = CLOSED
}
}
Expand Down Expand Up @@ -554,4 +562,14 @@ public class ReplayIntegration(
return ret
}
}

private class ReplayPersistingExecutorServiceThreadFactory : ThreadFactory {
private var cnt = 0

override fun newThread(r: Runnable): Thread {
val ret = Thread(r, "SentryReplayPersister-" + cnt++)
ret.setDaemon(true)
return ret
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import io.sentry.android.replay.ScreenshotRecorderConfig
import io.sentry.android.replay.capture.CaptureStrategy.Companion.createSegment
import io.sentry.android.replay.capture.CaptureStrategy.ReplaySegment
import io.sentry.android.replay.gestures.ReplayGestureConverter
import io.sentry.android.replay.util.ReplayExecutorService
import io.sentry.android.replay.util.ReplayRunnable
import io.sentry.protocol.SentryId
import io.sentry.rrweb.RRWebEvent
Expand All @@ -34,9 +33,7 @@ import java.io.File
import java.util.Date
import java.util.Deque
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
Expand All @@ -50,6 +47,7 @@ internal abstract class BaseCaptureStrategy(
private val scopes: IScopes?,
private val dateProvider: ICurrentDateProvider,
protected val replayExecutor: ScheduledExecutorService,
protected val persistingExecutor: ScheduledExecutorService,
private val replayCacheProvider: ((replayId: SentryId) -> ReplayCache)? = null,
) : CaptureStrategy {
internal companion object {
Expand All @@ -58,11 +56,6 @@ internal abstract class BaseCaptureStrategy(
private const val MAX_TRACE_IDS = 100
}

private val persistingExecutor: ScheduledExecutorService by lazy {
val delegate =
Executors.newSingleThreadScheduledExecutor(ReplayPersistingExecutorServiceThreadFactory())
ReplayExecutorService(delegate, options)
}
private val gestureConverter = ReplayGestureConverter(dateProvider)

protected val isTerminating = AtomicBoolean(false)
Expand Down Expand Up @@ -192,16 +185,6 @@ internal abstract class BaseCaptureStrategy(
}
}

private class ReplayPersistingExecutorServiceThreadFactory : ThreadFactory {
private var cnt = 0

override fun newThread(r: Runnable): Thread {
val ret = Thread(r, "SentryReplayPersister-" + cnt++)
ret.setDaemon(true)
return ret
}
}

private inline fun <T> persistableAtomicNullable(
initialValue: T? = null,
propertyName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ internal class BufferCaptureStrategy(
private val dateProvider: ICurrentDateProvider,
private val random: Random,
executor: ScheduledExecutorService,
persistingExecutor: ScheduledExecutorService,
replayCacheProvider: ((replayId: SentryId) -> ReplayCache)? = null,
) :
BaseCaptureStrategy(
options,
scopes,
dateProvider,
executor,
persistingExecutor,
replayCacheProvider = replayCacheProvider,
) {
// TODO: capture envelopes for buffered segments instead, but don't send them until buffer is
Expand Down Expand Up @@ -150,8 +152,10 @@ internal class BufferCaptureStrategy(
)
return this
}
// we hand over replayExecutor to the new strategy to preserve order of execution
val captureStrategy = SessionCaptureStrategy(options, scopes, dateProvider, replayExecutor)
// we hand over replayExecutor and persistingExecutor to the new strategy to preserve order of
// execution
val captureStrategy =
SessionCaptureStrategy(options, scopes, dateProvider, replayExecutor, persistingExecutor)
captureStrategy.recorderConfig = recorderConfig
captureStrategy.start(
segmentId = currentSegment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,17 @@ internal class SessionCaptureStrategy(
private val scopes: IScopes?,
private val dateProvider: ICurrentDateProvider,
executor: ScheduledExecutorService,
persistingExecutor: ScheduledExecutorService,
replayCacheProvider: ((replayId: SentryId) -> ReplayCache)? = null,
) : BaseCaptureStrategy(options, scopes, dateProvider, executor, replayCacheProvider) {
) :
BaseCaptureStrategy(
options,
scopes,
dateProvider,
executor,
persistingExecutor,
replayCacheProvider,
) {
internal companion object {
private const val TAG = "SessionCaptureStrategy"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,12 @@ class ReplayIntegrationTest {
null
}
},
mock {
whenever(mock.submit(any<Runnable>())).doAnswer {
(it.arguments[0] as Runnable).run()
null
}
},
) { _ ->
fixture.replayCache
}
Expand Down Expand Up @@ -1104,6 +1110,32 @@ class ReplayIntegrationTest {
assertEquals(traceId, traceIdRegistered)
}

@Test
fun `close shuts down persisting executor so no SentryReplayPersister threads leak`() {
fixture.options.cacheDirPath = tmpDir.newFolder().absolutePath
fixture.options.threadChecker = mock {
on { isMainThread }.thenReturn(true)
on { isMainThread(any<Thread>()) }.thenReturn(true)
}

repeat(3) {
val replay = fixture.getSut(context)
replay.register(fixture.scopes, fixture.options)
replay.start()
replay.stop()
replay.close()
}

Thread.sleep(200)

val leakedThreads =
Thread.getAllStackTraces().keys.filter { it.name.startsWith("SentryReplayPersister-") }
assertTrue(
leakedThreads.isEmpty(),
"Expected no SentryReplayPersister threads after close(), found: ${leakedThreads.map { it.name }}",
)
}

private fun getSessionCaptureStrategy(options: SentryOptions): SessionCaptureStrategy =
SessionCaptureStrategy(
options,
Expand All @@ -1116,5 +1148,12 @@ class ReplayIntegrationTest {
null
}
},
persistingExecutor =
mock {
whenever(mock.submit(any<Runnable>())).doAnswer {
(it.arguments[0] as Runnable).run()
null
}
},
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ class BufferCaptureStrategyTest {
null
}
},
mock {
whenever(it.submit(any<Runnable>())).doAnswer { invocation ->
(invocation.arguments[0] as Runnable).run()
null
}
},
) { _ ->
replayCache
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ class SessionCaptureStrategyTest {
.whenever(it)
.submit(any<Runnable>())
},
mock {
doAnswer { invocation ->
(invocation.arguments[0] as Runnable).run()
null
}
.whenever(it)
.submit(any<Runnable>())
},
) { _ ->
replayCache
}
Expand Down
Loading