Archived
1
0
Fork 0

Fix structured concurrency

This commit is contained in:
Moritz Ruth 2021-01-09 17:44:13 +01:00
parent e1651806ef
commit 1b98b19f16
No known key found for this signature in database
GPG key ID: AFD57E23E753841B
9 changed files with 51 additions and 77 deletions

View file

@ -6,7 +6,10 @@
package space.uranos.net package space.uranos.net
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import space.uranos.Position import space.uranos.Position
import space.uranos.Uranos
import space.uranos.chat.TextComponent import space.uranos.chat.TextComponent
import space.uranos.event.EventBusWrapper import space.uranos.event.EventBusWrapper
import space.uranos.net.packet.OutgoingPacket import space.uranos.net.packet.OutgoingPacket
@ -16,21 +19,20 @@ import space.uranos.player.Player
import space.uranos.world.World import space.uranos.world.World
import java.net.InetAddress import java.net.InetAddress
import java.util.* import java.util.*
import kotlin.coroutines.CoroutineContext
abstract class Session { abstract class Session {
val events by lazy { EventBusWrapper<Session>(this) } val events by lazy { EventBusWrapper<Session>(this) }
/**
* A coroutine scope that is cancelled when the session disconnected.
*/
val scope = CoroutineScope(SupervisorJob() + Uranos.dispatcher)
/** /**
* The IP address of this session * The IP address of this session
*/ */
abstract val address: InetAddress abstract val address: InetAddress
/**
* Unconfined [CoroutineContext] which is cancelled when the session is disconnected.
*/
abstract val coroutineContext: CoroutineContext
/** /**
* The brand name the client optionally sent during the login procedure. * The brand name the client optionally sent during the login procedure.
* [ClientBrandReceivedEvent][space.uranos.net.event.ClientBrandReceivedEvent] is emitted when this value changes. * [ClientBrandReceivedEvent][space.uranos.net.event.ClientBrandReceivedEvent] is emitted when this value changes.
@ -48,7 +50,6 @@ abstract class Session {
* The player corresponding to this session. * The player corresponding to this session.
*/ */
val player: Player? get() = (state as? State.Playing)?.player val player: Player? get() = (state as? State.Playing)?.player
// val player: Player? get() = (state as? State.WithPlayer)?.player
/** /**
* The current state of this session. * The current state of this session.

View file

@ -11,10 +11,9 @@ import space.uranos.net.Session
import space.uranos.world.Chunk import space.uranos.world.Chunk
import space.uranos.world.VoxelLocation import space.uranos.world.VoxelLocation
import java.util.* import java.util.*
import kotlin.coroutines.CoroutineContext
interface Player { interface Player {
val coroutineContext: CoroutineContext get() = session.coroutineContext val scope get() = session.scope
/** /**
* The session of this player. * The session of this player.

View file

@ -5,7 +5,9 @@
package space.uranos.server package space.uranos.server
import space.uranos.CalledFromWrongThread import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import space.uranos.Registry import space.uranos.Registry
import space.uranos.Scheduler import space.uranos.Scheduler
import space.uranos.command.Command import space.uranos.command.Command
@ -18,25 +20,26 @@ import space.uranos.net.Session
import space.uranos.player.Player import space.uranos.player.Player
import space.uranos.plugin.PluginManager import space.uranos.plugin.PluginManager
import space.uranos.recipe.Recipe import space.uranos.recipe.Recipe
import space.uranos.util.newSingleThreadDispatcher
import space.uranos.world.BiomeRegistry import space.uranos.world.BiomeRegistry
import space.uranos.world.Dimension import space.uranos.world.Dimension
import java.io.File import java.io.File
import java.util.* import java.util.*
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.CoroutineContext
abstract class Server { abstract class Server {
abstract val eventBus: EventBus abstract val eventBus: EventBus
abstract val eventHandlerPositions: EventHandlerPositionManager abstract val eventHandlerPositions: EventHandlerPositionManager
protected abstract val serverThread: Thread /**
* A coroutine dispatcher that is confined to the server thread.
*/
val dispatcher: CoroutineDispatcher = newSingleThreadDispatcher("Server")
/** /**
* [CoroutineContext] confined to the server thread. * A coroutine scope that is cancelled when the server is shutdown.
*
* Is cancelled when the server is shutting down.
*/ */
abstract val coroutineContext: CoroutineContext val scope = CoroutineScope(SupervisorJob() + dispatcher)
/** /**
* All sessions connected to the server. * All sessions connected to the server.
@ -68,13 +71,6 @@ abstract class Server {
*/ */
abstract fun shutdown() abstract fun shutdown()
/**
* Throws [CalledFromWrongThread] when called from a thread which is not the server thread.
*/
fun ensureServerThread(errorMessage: String) {
if (Thread.currentThread() != serverThread) throw CalledFromWrongThread(errorMessage)
}
/** /**
* Set of all existing [Entity] instances. * Set of all existing [Entity] instances.
* *

View file

@ -8,4 +8,4 @@ package space.uranos.util
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import space.uranos.Uranos import space.uranos.Uranos
suspend fun <T> runInServerThread(block: suspend () -> T): T = withContext(Uranos.coroutineContext) { block() } suspend fun <T> runInServerThread(block: suspend () -> T): T = withContext(Uranos.dispatcher) { block() }

View file

@ -5,10 +5,10 @@
package space.uranos.world package space.uranos.world
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import space.uranos.Uranos import space.uranos.Uranos
import space.uranos.player.Player import space.uranos.player.Player
import space.uranos.util.supervisorChild
import kotlin.coroutines.CoroutineContext
import kotlin.math.floor import kotlin.math.floor
abstract class Chunk( abstract class Chunk(
@ -40,11 +40,9 @@ abstract class Chunk(
} }
/** /**
* [CoroutineContext] confined to the world thread. * A coroutine scope confined to the world's dispatcher that is cancelled when the chunk is unloaded.
*
* Is cancelled when the chunk is unloaded.
*/ */
val coroutineContext: CoroutineContext = world.coroutineContext.supervisorChild(identifier) val scope = CoroutineScope(SupervisorJob() + world.dispatcher)
/** /**
* A list of all players who have locked this chunk. * A list of all players who have locked this chunk.

View file

@ -5,33 +5,31 @@
package space.uranos.world package space.uranos.world
import kotlinx.coroutines.cancel import kotlinx.coroutines.*
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import space.uranos.Uranos
import space.uranos.Vector import space.uranos.Vector
import space.uranos.entity.Entity import space.uranos.entity.Entity
import space.uranos.util.newSingleThreadDispatcher import space.uranos.util.newSingleThreadDispatcher
import space.uranos.util.supervisorChild
import space.uranos.util.untilPossiblyLower import space.uranos.util.untilPossiblyLower
import java.util.* import java.util.*
import java.util.concurrent.CopyOnWriteArraySet import java.util.concurrent.CopyOnWriteArraySet
import kotlin.coroutines.CoroutineContext
/** /**
* A Minecraft world. * A Minecraft world.
*/ */
abstract class World(val uuid: UUID) { abstract class World(val uuid: UUID) {
private val identifier = "World($uuid)" private val identifier = "World($uuid)"
private val threadExecutor = newSingleThreadDispatcher(identifier)
private val internalDispatcher = newSingleThreadDispatcher(identifier)
/** /**
* [CoroutineContext] confined to the world thread. * A coroutine dispatcher that is confined to the server thread.
*
* Is cancelled when the world is unloaded.
*/ */
val coroutineContext: CoroutineContext = Uranos.coroutineContext.supervisorChild(identifier) + threadExecutor val dispatcher: CoroutineDispatcher = internalDispatcher
/**
* A coroutine scope confined to [dispatcher] that is cancelled when the world is destroyed.
*/
val scope = CoroutineScope(SupervisorJob() + dispatcher)
abstract val dimension: Dimension abstract val dimension: Dimension
abstract val loadedChunks: Map<Chunk.Key, Chunk> abstract val loadedChunks: Map<Chunk.Key, Chunk>
@ -104,14 +102,14 @@ abstract class World(val uuid: UUID) {
} }
} }
suspend fun destroy() = withContext(coroutineContext) { suspend fun destroy() {
// TODO: Move or kick players // TODO: Move or kick players
coroutineContext.cancel() scope.cancel()
coroutineScope { coroutineScope {
loadedChunks.values.forEach { launch { (it as? Chunk.Unloadable)?.unload() } } loadedChunks.values.forEach { launch { (it as? Chunk.Unloadable)?.unload() } }
} }
threadExecutor.close() internalDispatcher.close()
} }
} }

View file

@ -8,9 +8,6 @@ package space.uranos
import com.sksamuel.hoplite.ConfigFilePropertySource import com.sksamuel.hoplite.ConfigFilePropertySource
import com.sksamuel.hoplite.ConfigLoader import com.sksamuel.hoplite.ConfigLoader
import com.sksamuel.hoplite.ConfigSource import com.sksamuel.hoplite.ConfigSource
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import space.uranos.command.Command import space.uranos.command.Command
import space.uranos.config.UranosConfig import space.uranos.config.UranosConfig
@ -26,13 +23,11 @@ import space.uranos.recipe.Recipe
import space.uranos.server.Server import space.uranos.server.Server
import space.uranos.util.EncryptionUtils import space.uranos.util.EncryptionUtils
import space.uranos.util.msToTicks import space.uranos.util.msToTicks
import space.uranos.util.runInServerThread
import space.uranos.world.BiomeRegistry import space.uranos.world.BiomeRegistry
import space.uranos.world.Dimension import space.uranos.world.Dimension
import java.io.File import java.io.File
import java.security.KeyPair import java.security.KeyPair
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
import kotlin.system.exitProcess import kotlin.system.exitProcess
// TODO: Consider using DI because this improves testability // TODO: Consider using DI because this improves testability
@ -50,13 +45,6 @@ class UranosServer internal constructor() : Server() {
val x509EncodedPublicKey: ByteArray = EncryptionUtils.generateX509Key(keyPair.public).encoded val x509EncodedPublicKey: ByteArray = EncryptionUtils.generateX509Key(keyPair.public).encoded
override lateinit var serverThread: Thread
private val scheduledExecutorService: ExecutorService =
Executors.newSingleThreadExecutor { r -> Thread(r, "Server").also { serverThread = it } }
override val coroutineContext: CoroutineContext =
CoroutineName("Server") + SupervisorJob() + scheduledExecutorService.asCoroutineDispatcher()
override val sessions by socketServer::sessions override val sessions by socketServer::sessions
override val players get() = sessions.mapNotNull { it.player as UranosPlayer? } override val players get() = sessions.mapNotNull { it.player as UranosPlayer? }
@ -124,6 +112,7 @@ class UranosServer internal constructor() : Server() {
private fun startTicking() { private fun startTicking() {
scheduler.executeRepeating(1, 0) { scheduler.executeRepeating(1, 0) {
runInServerThread {
players.forEach { it.container.tick() } players.forEach { it.container.tick() }
entities.forEach { entities.forEach {
@ -134,6 +123,7 @@ class UranosServer internal constructor() : Server() {
sessions.forEach { it.packetsAdapter.tick() } sessions.forEach { it.packetsAdapter.tick() }
} }
} }
}
private fun startPingSync() { private fun startPingSync() {
scheduler.executeRepeating(msToTicks(config.pingUpdateInterval.toMillis()), 0) { scheduler.executeRepeating(msToTicks(config.pingUpdateInterval.toMillis()), 0) {

View file

@ -9,7 +9,6 @@ import io.netty.buffer.ByteBuf
import io.netty.util.ReferenceCountUtil import io.netty.util.ReferenceCountUtil
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import space.uranos.event.ifNotCancelled import space.uranos.event.ifNotCancelled
import space.uranos.net.MinecraftProtocolDataTypes.readVarInt import space.uranos.net.MinecraftProtocolDataTypes.readVarInt
import space.uranos.net.event.PacketReceivedEvent import space.uranos.net.event.PacketReceivedEvent
@ -22,7 +21,7 @@ import space.uranos.util.awaitSuspending
class PacketsAdapter(val session: UranosSession) { class PacketsAdapter(val session: UranosSession) {
private val packetsForNextTick = ArrayList<OutgoingPacket>() private val packetsForNextTick = ArrayList<OutgoingPacket>()
suspend fun tick() = withContext(session.coroutineContext) { suspend fun tick() {
packetsForNextTick.forEach { send(it) } packetsForNextTick.forEach { send(it) }
packetsForNextTick.clear() packetsForNextTick.clear()
} }
@ -44,14 +43,14 @@ class PacketsAdapter(val session: UranosSession) {
} }
} }
suspend fun sendNextTick(packet: OutgoingPacket) = withContext(session.coroutineContext) { fun sendNextTick(packet: OutgoingPacket) {
if (packet is Mergeable) { if (packet is Mergeable) {
for (i in packetsForNextTick.indices.reversed()) { for (i in packetsForNextTick.indices.reversed()) {
val merged = packet.mergeWith(packetsForNextTick[i]) val merged = packet.mergeWith(packetsForNextTick[i])
if (merged != null) { if (merged != null) {
packetsForNextTick[i] = merged packetsForNextTick[i] = merged
return@withContext return
} }
} }
} }
@ -59,7 +58,7 @@ class PacketsAdapter(val session: UranosSession) {
packetsForNextTick.add(packet) packetsForNextTick.add(packet)
} }
suspend fun send(packet: OutgoingPacket): Unit = withContext(session.coroutineContext) { suspend fun send(packet: OutgoingPacket) {
if (session.server.config.logging.shouldLog(packet)) session.logger.trace { "Sending packet: $packet" } if (session.server.config.logging.shouldLog(packet)) session.logger.trace { "Sending packet: $packet" }
session.server.eventBus.emit(PacketSendEvent(session, packet)).ifNotCancelled { session.server.eventBus.emit(PacketSendEvent(session, packet)).ifNotCancelled {

View file

@ -23,11 +23,9 @@ import space.uranos.net.packet.play.PlayProtocol
import space.uranos.net.packet.play.PlayerInfoPacket import space.uranos.net.packet.play.PlayerInfoPacket
import space.uranos.net.packet.status.StatusProtocol import space.uranos.net.packet.status.StatusProtocol
import space.uranos.server.event.SessionInitializedEvent import space.uranos.server.event.SessionInitializedEvent
import space.uranos.util.supervisorChild
import java.net.InetAddress import java.net.InetAddress
import java.net.InetSocketAddress import java.net.InetSocketAddress
import javax.crypto.SecretKey import javax.crypto.SecretKey
import kotlin.coroutines.CoroutineContext
import kotlin.properties.Delegates import kotlin.properties.Delegates
class UranosSession(val channel: io.netty.channel.Channel, val server: UranosServer) : Session() { class UranosSession(val channel: io.netty.channel.Channel, val server: UranosServer) : Session() {
@ -36,9 +34,6 @@ class UranosSession(val channel: io.netty.channel.Channel, val server: UranosSer
private val identifier = "UranosSession(${address.hostAddress})" private val identifier = "UranosSession(${address.hostAddress})"
val logger = Logger(identifier) val logger = Logger(identifier)
override val coroutineContext: CoroutineContext = server.coroutineContext.supervisorChild(identifier)
val scope = CoroutineScope(coroutineContext)
override var brand: String? = null override var brand: String? = null
override var ping: Int = -1 override var ping: Int = -1
@ -75,9 +70,7 @@ class UranosSession(val channel: io.netty.channel.Channel, val server: UranosSer
val packetsAdapter = PacketsAdapter(this) val packetsAdapter = PacketsAdapter(this)
override suspend fun send(packet: OutgoingPacket) = packetsAdapter.send(packet) override suspend fun send(packet: OutgoingPacket) = packetsAdapter.send(packet)
override fun sendNextTick(packet: OutgoingPacket) { override fun sendNextTick(packet: OutgoingPacket) = packetsAdapter.sendNextTick(packet)
scope.launch { packetsAdapter.sendNextTick(packet) }
}
override suspend fun sendPluginMessage(channel: String, data: ByteBuf) { override suspend fun sendPluginMessage(channel: String, data: ByteBuf) {
if (this.currentProtocol != PlayProtocol) throw IllegalStateException("The session is not using the PLAY protocol") if (this.currentProtocol != PlayProtocol) throw IllegalStateException("The session is not using the PLAY protocol")