Cancellations in Kotlin Flow

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

Cancellations in Kotlin Flow

In this lesson, you’ll learn about handling cancellations in Kotlin Flow. This is important for managing your app’s resources efficiently and ensuring it isn’t doing any unnecessary work.

Cancellation and Coroutine Scope

Cancellation in Kotlin Flow is closely tied to its coroutine context. A flow built within a coroutine is automatically canceled when the coroutine’s scope is canceled. This behavior ensures that flow-related resources are released when they’re no longer needed, preventing potential memory leaks and performance issues.

val factoryScope = CoroutineScope(Dispatchers.Default)

fun carrots(): Flow<Carrot> = fetchNewBatches().asFlow()
  .flatMapLatest { batch -> batch.carrots.asFlow() }

fun processCarrots() {
  factoryScope.launch {
    val firstProcessingLine = scope.launch {
      carrots()
        .map { carrot -> processCarrot() }
        .collect { carrot ->
          println("Processing carrots on Line 1: $type")
        }
    }

    val secondProcessingLine = scope.launch {
      carrots()
        .map { carrot -> processCarrot() }
        .collect { carrot ->
          println("Processing carrots on Line 2: $type")
        }
    }
    
    delay(5000) // During this period you notice that the second line is not needed
    
    secondProcessingLine.cancel() // Cancel the second processing line
  }
}

fun shutdownFactory() {
  factoryScope.cancel() // Cancel the factory scope
}

The timeout Operator

The timeout operator limits the time allowed for an operation within a flow. If the specified timeout is exceeded, the operator throws a TimeoutCancellationException, which can be handled like any other exception in coroutines.

val factoryScope = CoroutineScope(Dispatchers.Default)

fun carrots(): Flow<Carrot> = fetchNewBatches().asFlow()
  .flatMapLatest { batch -> batch.carrots.asFlow() }

fun processCarrots() {
  factoryScope.launch {
    val firstProcessingLine = scope.launch {
      carrots()
        .map { carrot -> processCarrot() }
        .timeout(1000) {                                // Set a timeout of 1 second
          throw TimeoutCancellationException("Carrot processing timed out. Check processing line.")
        }
        .collect { carrot ->
          println("Processing carrots on Line 1: $type")
        }
        .catch { e -> 
          // Check processing line
        }
    }

    val secondProcessingLine = scope.launch {
      carrots()
        .map { carrot -> processCarrot() }
        .timeout(1000) {                                // Set a timeout of 1 second
          throw TimeoutCancellationException("Carrot processing timed out. Check processing line.")
        }
        .collect { carrot ->
          println("Processing carrots on Line 2: $type")
        }
        .catch { e ->
          // Check processing line
        }
    }

    delay(5000) // During this period you notice that the second line is not needed

    secondProcessingLine.cancel() // Cancel the second processing line
  }
}

fun shutdownFactory() {
  factoryScope.cancel() // Cancel the factory scope
}

Wrap-Up

In this lesson, you’ve learned about cancellations in Kotlin Flow, particularly how cancellation is managed through coroutine scopes and how the timeout operator can be used to enforce time limits on flow operations.

See forum comments
Download course materials from Github
Previous: Flow Error Handling Demo Next: Timeout Operator Demo