From: Pat Thoyts Date: Thu, 14 Oct 2010 23:59:55 +0000 (+0100) Subject: Implemented fileevent support X-Git-Url: http://privyetmir.co.uk/gitweb.cgi?a=commitdiff_plain;h=557e9a03a2640ba0307103737281dacaf56f6cf8;p=tclstorage Implemented fileevent support Added tests for readable fileevent and asynchronous fcopy. Signed-off-by: Pat Thoyts --- diff --git a/tclstorage.c b/tclstorage.c index 33d9f62..a7763f3 100644 --- a/tclstorage.c +++ b/tclstorage.c @@ -73,6 +73,7 @@ extern Tcl_ObjCmdProc PropertySetNamesCmd; static long UNIQUEID = 0; +static Tcl_InterpDeleteProc PackageDeleteProc; static int GetItemInfo(Tcl_Interp *interp, IStorage *pstg, Tcl_Obj *pathObj, STATSTG *pstatstg); static void TimeToFileTime(time_t t, LPFILETIME pft); @@ -87,14 +88,40 @@ static Tcl_DriverWatchProc StorageChannelWatch; static Tcl_DriverGetHandleProc StorageChannelGetHandle; static Tcl_DriverWideSeekProc StorageChannelWideSeek; -typedef struct { +static int EventProc(Tcl_Event *evPtr, int flags); +static void SetupProc(ClientData clientData, int flags); +static void CheckProc(ClientData clientData, int flags); + +#define STORAGE_PACKAGE_KEY "StoragePackageKey" +#define STORAGE_FLAG_ASYNC (1<<1) +#define STORAGE_FLAG_PENDING (1<<2) + +struct Package; + +typedef struct StorageChannel { + Tcl_Channel chan; + struct Package *pkgPtr; + struct StorageChannel *nextPtr; Tcl_Interp *interp; DWORD grfMode; int watchmask; int validmask; + int flags; IStream *pstm; } StorageChannel; +typedef struct Package { + struct StorageChannel *headPtr; + unsigned long count; + unsigned long uid; +} Package; + +typedef struct ChannelEvent { + Tcl_Event header; + StorageChannel *instPtr; + int flags; +} ChannelEvent; + static Tcl_ChannelType StorageChannelType = { "storage", (Tcl_ChannelTypeVersion)TCL_CHANNEL_VERSION_2, @@ -214,13 +241,19 @@ int Storage_Init(Tcl_Interp *interp) { EnsembleCmdData *dataPtr; - -#ifdef USE_TCL_STUBS + Package *pkgPtr; + if (Tcl_InitStubs(interp, "8.2", 0) == NULL) { return TCL_ERROR; } -#endif + pkgPtr = (Package *)ckalloc(sizeof(Package)); + pkgPtr->headPtr = NULL; + pkgPtr->count = 0; + pkgPtr->uid = 0; + Tcl_CreateEventSource(SetupProc, CheckProc, pkgPtr); + Tcl_SetAssocData(interp, STORAGE_PACKAGE_KEY, PackageDeleteProc, pkgPtr); + dataPtr = (EnsembleCmdData *)ckalloc(sizeof(EnsembleCmdData)); dataPtr->ensemble = StorageEnsemble; dataPtr->clientData = NULL; @@ -275,6 +308,30 @@ StorageCmdDeleteProc(ClientData clientData) ckfree((char *)data); } +/* + * ---------------------------------------------------------------------- + * + * PackageDeleteProc - + * + * Clean up the allocated memory associated with the package. + * + * Results: + * None. + * + * Side effects: + * Memory free'd and the event source removed. + * + * ---------------------------------------------------------------------- + */ + +static void +PackageDeleteProc(ClientData clientData, Tcl_Interp *interp) +{ + Package *pkgPtr = clientData; + Tcl_DeleteEventSource(SetupProc, CheckProc, pkgPtr); + ckfree((char *)pkgPtr); +} + /* * ---------------------------------------------------------------------- * @@ -658,9 +715,9 @@ StorageOpenCmd(ClientData clientData, Tcl_Interp *interp, Tcl_SetObjResult(interp, errObj); r = TCL_ERROR; } else { + Package *pkgPtr; StorageChannel *inst; char name[3 + TCL_INTEGER_SPACE]; - Tcl_Channel chan; _snprintf(name, 3 + TCL_INTEGER_SPACE, "stm%ld", InterlockedIncrement(&UNIQUEID)); @@ -669,16 +726,25 @@ StorageOpenCmd(ClientData clientData, Tcl_Interp *interp, inst->grfMode = mode; inst->interp = interp; inst->watchmask = 0; + inst->flags = 0; /* bit0 set then not readable */ inst->validmask = (mode & STGM_WRITE) ? 0 : TCL_READABLE; inst->validmask |= (mode & (STGM_WRITE|STGM_READWRITE)) ? TCL_WRITABLE : 0; - chan = Tcl_CreateChannel(&StorageChannelType, name, - inst, inst->validmask); - Tcl_RegisterChannel(interp, chan); + inst->chan = Tcl_CreateChannel(&StorageChannelType, name, + inst, inst->validmask); + Tcl_RegisterChannel(interp, inst->chan); if (mode & STGM_APPEND) { - Tcl_Seek(chan, 0, SEEK_END); + Tcl_Seek(inst->chan, 0, SEEK_END); } + + /* insert at head of channels list */ + pkgPtr = Tcl_GetAssocData(interp, STORAGE_PACKAGE_KEY, NULL); + inst->pkgPtr = pkgPtr; + inst->nextPtr = pkgPtr->headPtr; + pkgPtr->headPtr = inst; + ++pkgPtr->count; + Tcl_SetObjResult(interp, Tcl_NewStringObj(name, -1)); r = TCL_OK; } @@ -915,6 +981,8 @@ StorageRemoveCmd(ClientData clientData, Tcl_Interp *interp, * StorageChannelClose - * * Called by the Tcl channel layer to close the channel. + * The channel must be removed from the linked list help + * in the Package structure. * * Results: * A standard Tcl result @@ -928,11 +996,22 @@ StorageRemoveCmd(ClientData clientData, Tcl_Interp *interp, static int StorageChannelClose(ClientData instanceData, Tcl_Interp *interp) { - StorageChannel *chan = (StorageChannel *)instanceData; + StorageChannel *instPtr = instanceData; + StorageChannel **tmpPtrPtr; + Package *pkgPtr = instPtr->pkgPtr; - if (chan->pstm) - chan->pstm->lpVtbl->Release(chan->pstm); - ckfree((char *)chan); + /* remove this channel from the package list */ + tmpPtrPtr = &pkgPtr->headPtr; + while (*tmpPtrPtr && *tmpPtrPtr != instPtr) { + tmpPtrPtr = &(*tmpPtrPtr)->nextPtr; + } + *tmpPtrPtr = instPtr->nextPtr; + --pkgPtr->count; + + /* free the stream and the memory */ + if (instPtr->pstm) + instPtr->pstm->lpVtbl->Release(instPtr->pstm); + ckfree((char *)instPtr); return TCL_OK; } @@ -1079,8 +1158,11 @@ StorageChannelWideSeek(ClientData instanceData, Tcl_WideInt offset, * * StorageChannelWatch - * - * Called by the Tcl channel layer to check that we are ready for - * file events. We are. + * Called by the Tcl channel layer when someone calls 'fileevent' on + * our channel handle. As we are always readable and writable, we + * set the watchmask flag appropriately and set the blocktime to 0 + * This allows the notified to call SetupProc and CheckProc to + * poll any of the channels from this package for events. * * Results: * None. @@ -1137,6 +1219,79 @@ StorageChannelGetHandle(ClientData instanceData, return SUCCEEDED(hr) ? TCL_OK : TCL_ERROR; } +static int +EventProc(Tcl_Event *evPtr, int flags) +{ + ChannelEvent *eventPtr = (ChannelEvent *)evPtr; + StorageChannel *chanPtr = eventPtr->instPtr; + + if (!(flags & TCL_FILE_EVENTS)) { + return 0; + } + chanPtr->flags &= ~STORAGE_FLAG_PENDING; + Tcl_NotifyChannel(chanPtr->chan, chanPtr->watchmask & eventPtr->flags); + return 1; +} + +/** + * This function is called to setup the notifier to monitor our + * channel for file events. Our CheckProc will be called anyway after some + * interval so we really only need to ensure that it is called at some + * appropriate interval. + */ + +static void +SetupProc(ClientData clientData, int flags) +{ + Package *pkgPtr = clientData; + StorageChannel *chanPtr = NULL; + int msec = 10000; + Tcl_Time blockTime = {0, 0}; + + if (!(flags & TCL_FILE_EVENTS)) { + return; + } + + for (chanPtr = pkgPtr->headPtr; chanPtr != NULL; chanPtr = chanPtr->nextPtr) { + msec = 10; + } + blockTime.sec = msec / 1000; + blockTime.usec = (msec % 1000) * 1000; + Tcl_SetMaxBlockTime(&blockTime); +} + +static void +CheckProc(ClientData clientData, int flags) +{ + Package *pkgPtr = clientData; + StorageChannel *chanPtr = NULL; + int mask; + + if (!(flags & TCL_FILE_EVENTS)) { + return; + } + + for (chanPtr = pkgPtr->headPtr; chanPtr != NULL; chanPtr = chanPtr->nextPtr) { + if (chanPtr->watchmask == 0) { + continue; + } + + /* queue an event to trigger the notifier - we use an event + * for this to avoid starving other resources + * We are always writable and readable. + */ + mask = TCL_WRITABLE | TCL_READABLE; + if (chanPtr->watchmask & mask) { + ChannelEvent *evPtr = (ChannelEvent *)ckalloc(sizeof(ChannelEvent)); + chanPtr->flags |= STORAGE_FLAG_PENDING; + evPtr->header.proc = EventProc; + evPtr->instPtr = chanPtr; + evPtr->flags = mask; + Tcl_QueueEvent((Tcl_Event *)evPtr, TCL_QUEUE_TAIL); + } + } +} + /* * ---------------------------------------------------------------------- * diff --git a/tests/tclstorage.test b/tests/tclstorage.test index 4804c6d..d77a1e1 100644 --- a/tests/tclstorage.test +++ b/tests/tclstorage.test @@ -536,6 +536,129 @@ test storage-3.9 {open sub-storage for reading - fail write} \ } \ -result {1 {error opening "test": permission denied}} +proc onRead {chan size cmd} { + set data [read $chan $size] + if {[set eof [eof $chan]]} { + fileevent $chan readable {} + set ::waiting eof + } + uplevel #0 [linsert $cmd end $eof $data] +} + +test storage-4.0 {fileevent single readable} -setup { + set stg [storage open stg40.stg w+] + set stm [$stg open test.stm w+] + puts -nonewline $stm [string repeat a 655] + close $stm + set ::size 0 + proc stg40 {eof data} {incr ::size [string length $data]} +} -body { + set stm [$stg open test.stm r] + set len [string length [read $stm]] + seek $stm 0 + fileevent $stm readable [list onRead $stm 1024 ::stg40] + set aid [after 1000 {set ::waiting timeout}] + vwait ::waiting + after cancel $aid + close $stm + list $len $::waiting $::size +} -cleanup { + $stg close + file delete -force stg40.stg + unset ::size ::waiting + rename stg40 {} +} -result {655 eof 655} + +test storage-4.1 {fileevent multiple readable} -setup { + set stg [storage open stg41.stg w+] + set stm [$stg open test.stm w+] + puts -nonewline $stm [string repeat \0\1\2\3\4 10240] + close $stm + set ::size 0 + proc stg41 {eof data} {incr ::size [string length $data]} +} -body { + set stm [$stg open test.stm r] + set len [string length [read $stm]] + seek $stm 0 + fileevent $stm readable [list onRead $stm 1024 ::stg41] + set aid [after 1000 {set ::waiting timeout}] + vwait ::waiting + after cancel $aid + close $stm + list $len $::waiting $::size +} -cleanup { + $stg close + file delete -force stg41.stg + unset ::size ::waiting + rename stg41 {} +} -result {51200 eof 51200} + +test storage-5.0 {fcopy async single} -setup { + set stg [storage open stg50.stg w+] + set stm [$stg open test.stm w+] + puts -nonewline $stm [string repeat \0\1\2\3\4 10240] + close $stm + set outfile [makeFile {} test50.check] + set ::size 0 + proc stg50 {count {err ok}} {set ::waiting $err} +} -body { + set stm [$stg open test.stm r] + fconfigure $stm -translation binary -encoding binary -eofchar {} + set out [open $outfile w] + fconfigure $out -translation binary -encoding binary -eofchar {} + set len [string length [read $stm]] + seek $stm 0 + set aid [after 1000 {set ::waiting timeout}] + fcopy $stm $out -command stg50 + vwait ::waiting + after cancel $aid + close $stm + close $out + list $len $::waiting [file size $outfile] +} -cleanup { + $stg close + file delete -force stg50.stg + unset ::size ::waiting + rename stg50 {} + removeFile $outfile +} -result {51200 ok 51200} + +test storage-5.1 {fcopy async multiple} -setup { + set stg [storage open stg51.stg w+] + set stm [$stg open test.stm w+] + puts -nonewline $stm [string repeat \0\1\2\3\4 10240] + close $stm + set outfile [makeFile {} test51.check] + set ::size 0 + proc stg51 {stm out count {err ok}} { + if {$err ne "ok" || [eof $stm]} { + set ::waiting $err + } else { + fcopy $stm $out -size 1024 -command [list stg51 $stm $out] + } + } +} -body { + set stm [$stg open test.stm r] + fconfigure $stm -translation binary -encoding binary -eofchar {} + set out [open $outfile w] + fconfigure $out -translation binary -encoding binary -eofchar {} + set len [string length [read $stm]] + seek $stm 0 + set aid [after 1000 {set ::waiting timeout}] + fcopy $stm $out -size 1024 -command [list stg51 $stm $out] + vwait ::waiting + after cancel $aid + close $stm + close $out + list $len $::waiting [file size $outfile] +} -cleanup { + $stg close + file delete -force stg51.stg + unset ::size ::waiting + rename stg51 {} + removeFile $outfile +} -result {51200 ok 51200} + # ------------------------------------------------------------------------- ::tcltest::cleanupTests