/* * Copyright @ 2021-present 8x8, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import Foundation class SocketConnection: NSObject { var didOpen: (() -> Void)? var didClose: ((Error?) -> Void)? var streamHasSpaceAvailable: (() -> Void)? private let filePath: String private var socketHandle: Int32 = -1 private var address: sockaddr_un? private var inputStream: InputStream? private var outputStream: OutputStream? private var networkQueue: DispatchQueue? private var shouldKeepRunning = false init?(filePath path: String) { filePath = path socketHandle = Darwin.socket(AF_UNIX, SOCK_STREAM, 0) guard socketHandle != -1 else { print("failure: create socket") return nil } } func open() -> Bool { print("open socket connection") guard FileManager.default.fileExists(atPath: filePath) else { print("failure: socket file missing") return false } guard setupAddress() == true else { return false } guard connectSocket() == true else { return false } setupStreams() inputStream?.open() outputStream?.open() return true } func close() { unscheduleStreams() inputStream?.delegate = nil outputStream?.delegate = nil inputStream?.close() outputStream?.close() inputStream = nil outputStream = nil } func writeToStream(buffer: UnsafePointer, maxLength length: Int) -> Int { return outputStream?.write(buffer, maxLength: length) ?? 0 } } extension SocketConnection: StreamDelegate { func stream(_ aStream: Stream, handle eventCode: Stream.Event) { switch eventCode { case .openCompleted: print("client stream open completed") if aStream == outputStream { didOpen?() } case .hasBytesAvailable: if aStream == inputStream { var buffer: UInt8 = 0 let numberOfBytesRead = inputStream?.read(&buffer, maxLength: 1) if numberOfBytesRead == 0 && aStream.streamStatus == .atEnd { print("server socket closed") close() notifyDidClose(error: nil) } } case .hasSpaceAvailable: if aStream == outputStream { streamHasSpaceAvailable?() } case .errorOccurred: print("client stream error occurred: \(String(describing: aStream.streamError))") close() notifyDidClose(error: aStream.streamError) default: break } } } private extension SocketConnection { func setupAddress() -> Bool { var addr = sockaddr_un() guard filePath.count < MemoryLayout.size(ofValue: addr.sun_path) else { print("failure: fd path is too long") return false } _ = withUnsafeMutablePointer(to: &addr.sun_path.0) { ptr in filePath.withCString { strncpy(ptr, $0, filePath.count) } } address = addr return true } func connectSocket() -> Bool { guard var addr = address else { return false } let status = withUnsafePointer(to: &addr) { ptr in ptr.withMemoryRebound(to: sockaddr.self, capacity: 1) { Darwin.connect(socketHandle, $0, socklen_t(MemoryLayout.size)) } } guard status == noErr else { print("failure: \(status)") return false } return true } func setupStreams() { var readStream: Unmanaged? var writeStream: Unmanaged? CFStreamCreatePairWithSocket(kCFAllocatorDefault, socketHandle, &readStream, &writeStream) inputStream = readStream?.takeRetainedValue() inputStream?.delegate = self inputStream?.setProperty(kCFBooleanTrue, forKey: Stream.PropertyKey(kCFStreamPropertyShouldCloseNativeSocket as String)) outputStream = writeStream?.takeRetainedValue() outputStream?.delegate = self outputStream?.setProperty(kCFBooleanTrue, forKey: Stream.PropertyKey(kCFStreamPropertyShouldCloseNativeSocket as String)) scheduleStreams() } func scheduleStreams() { shouldKeepRunning = true networkQueue = DispatchQueue.global(qos: .userInitiated) networkQueue?.async { [weak self] in self?.inputStream?.schedule(in: .current, forMode: .common) self?.outputStream?.schedule(in: .current, forMode: .common) var isRunning = false repeat { isRunning = self?.shouldKeepRunning ?? false && RunLoop.current.run(mode: .default, before: .distantFuture) } while (isRunning) } } func unscheduleStreams() { networkQueue?.sync { [weak self] in self?.inputStream?.remove(from: .current, forMode: .common) self?.outputStream?.remove(from: .current, forMode: .common) } shouldKeepRunning = false } func notifyDidClose(error: Error?) { if didClose != nil { didClose?(error) } } }