Single-threaded design philosophy
NxCore API was designed to be single-threaded, and
internally uses thread-specific (thread local storage, or TLS) data structures
to allow complete isolation between multiple threads running the API.
Each of your multiple NxCore processing threads is completely isolated from
others.
Each API call you make works on the data within your thread.
All API calls are deterministic and synchronous.
Parallel processing
The single-threaded design allows you to have a large
number of independent threads processing different tapes at the same time, with
zero synchronization requirements.
Just start a thread, have that thread call ProcessTape(), and that thread will
process that tape without you worrying about any further
threading/synchronization issues.
Use of global variables
The only global variable that's safe to use in multiple
NxCore threads is NxCoreClass.
While you can use __declspec(thread) for your global variables, a better way is
to create a struct that will contain all your global data, and pass a pointer
to that structure in the UserData field of ProcessTape().
Look at the example below -- the PerThreadData structure is used to simulate
global variables.
Lifetime of NxCore data structures
All NxCore data structures are allocated at beginning of
ProcessTape, and destroyed before ProcessTape returns.
NxString values remain constant, but virtually all other structures get
overwritten on every callback.
If you're looking to use your own synchronization mechanisms, you'll need to
manage copying the data off yourself.
Splitting of a tape across multiple threads
A single tape or stream can be processed on multiple threads. The playback is not syncronized between threads. Using ExcludeOpraExch allows for filtering of the playback.
Here's an example of a very simple, thread-safe, multiple-tape message counter
#define _CRT_SECURE_NO_WARNINGS #include <thread> #include <stdio.h> #include "NxCoreAPI.h" #include "NxCoreAPI_Wrapper_C++.h" NxCoreClass NxCore; struct PerThreadData { __int64 counter; int rc; int time; const char* tape; PerThreadData() : counter(0), rc(0), time(0), tape(0) {} }; PerThreadData* threadData; static int __stdcall nxCoreCallback(const NxCoreSystem* pNxCoreSys, const NxCoreMessage* pNxCoreMessage) { PerThreadData* pThreadData = &threadData[pNxCoreSys->UserData]; pThreadData->counter++; if ( pNxCoreMessage->MessageType==NxMSG_STATUS && pNxCoreSys->ClockUpdateInterval >= NxCLOCK_MINUTE ) { printf("Tape Date: %02d/%02d/%d Tape Time: %02d:%02d:%02d\n", pNxCoreSys->nxDate.Month,pNxCoreSys->nxDate.Day,pNxCoreSys->nxDate.Year, pNxCoreSys->nxTime.Hour,pNxCoreSys->nxTime.Minute,pNxCoreSys->nxTime.Second); } return NxCALLBACKRETURN_CONTINUE; } static void thProcess(size_t threadIx) { PerThreadData *pThreadData = (PerThreadData*) &threadData[threadIx]; long startTick = GetTickCount(); pThreadData->rc = NxCore.ProcessTape(pThreadData->tape, 0, NxCF_EXCLUDE_CRC_CHECK, (int) threadIx, nxCoreCallback); pThreadData->time = GetTickCount() - startTick; } int main(int argc, char** argv) { if (argc == 1) { printf("%s tape1 [tape2 tape3 ...]\n", argv[0]); return -1; } if (!NxCore.LoadNxCore("NxCoreAPI64.dll") && !NxCore.LoadNxCore("NxCoreAPI.dll")) { printf("loading library failed\n"); return -1; } const int numTapes = argc - 1; std::thread* threads = new std::thread[numTapes]; threadData = new PerThreadData[numTapes]; for (int i = 0; i < numTapes; i++) { threadData[i].tape = argv[i + 1]; printf("Start Tape: %s\n",threadData[i].tape); threads[i] = std::thread(thProcess, i); } for (int i = 0; i < numTapes; i++) threads[i].join(); for (int i = 0; i < numTapes; i++) { const PerThreadData& ptd = threadData[i]; printf("Tape[%s] Count[%I64d] RC[%ld] Millis[%ld]\n", ptd.tape, ptd.counter, ptd.rc, ptd.time); } delete threads; delete threadData; return 0; } |
import net.nanex.NxCoreClass; // Java does not have Global variables. // Instead of passing an index of a global array in UserData // thread statistics will be tracked in NxCoreThread objects class MultithreadingSample { public static void main(String args[]) throws InterruptedException { NxCoreThread nxThreads[] = new NxCoreThread[args.length]; Thread threads[] = new Thread[args.length]; //create NxCoreThread objects using tape name for(int i = 0; i < args.length; i++) nxThreads[i] = new NxCoreThread(args[i]); //create java threads using NxCoreThreads object for(int i = 0; i < args.length; i++) threads[i] = new Thread(nxThreads[i]); //start threads for(Thread thread : threads) thread.start(); //wait for threads to finish for(Thread thread : threads) thread.join(); //print results stored in NxCoreThread objects for(NxCoreThread nxThread : nxThreads) System.out.println(String.format("Tape[%s] Count[%d] RC[%d] Millis[%d]\n", nxThread.tape, nxThread.counter, nxThread.rc, nxThread.time)); } } class NxCoreThread extends NxCoreClass implements Runnable { long counter = 0; int rc = -1; int time = 0; String tape; public NxCoreThread(String tapePath) { this.tape = tapePath; } public void run() { if (LoadNxCore("NxCoreAPI64.dll") != 0){ long startTick = System.currentTimeMillis(); rc = ProcessTape(tape, 0, defines.NxCF_EXCLUDE_CRC_CHECK, 0); time = (int)(System.currentTimeMillis() - startTick); } else System.out.println("loading library failed"); } @Override public int OnNxCoreCallback( NxCoreSystem nxCoreSys, NxCoreMessage nxCoreMsg) { counter++; if (nxCoreMsg.MessageType == defines.NxMSG_STATUS) { if (nxCoreSys.ClockUpdateInterval >= defines.NxCLOCK_MINUTE) { int year = nxCoreSys.nxDate.Year; int month = nxCoreSys.nxDate.Month; int day = nxCoreSys.nxDate.Day; int hour = nxCoreSys.nxTime.Hour; int minute = nxCoreSys.nxTime.Minute; int second = nxCoreSys.nxTime.Second; System.out.println(String.format("Tape Date: %02d/%02d/%d Tape Time: %02d:%02d:%02d", month, day, year, hour, minute, second)); } } return defines.NxCALLBACKRETURN_CONTINUE; } } |
import NxCore from multiprocessing import Process import time #populate array with list of tapes tapeArray = ["tape1.nx2", "tape2.nx2"] #python does not easily support multithreaded arrays #use gloabl counter variables for each process instead counter = 0 def OnNxCoreCallback(NxCoreSys, NxCoreMsg): global counter counter += 1 if NxCoreSys.ClockUpdateInterval >= NxCore.NxCLOCK_MINUTE: year = NxCoreSys.nxDate.Year month = NxCoreSys.nxDate.Month day = NxCoreSys.nxDate.Day hour = NxCoreSys.nxTime.Hour minute = NxCoreSys.nxTime.Minute print("{:02d}/{:02d}/{:04d} {:02d}:{:02d}" .format(month, day, year, hour, minute)) return NxCore.NxCALLBACKRETURN_CONTINUE def NxCoreThread(tapePath, i): t0 = time.time() tape = tapePath print("tape:" + tapePath) if NxCore.LoadNxCore("NxCoreAPI64.dll"): rc = NxCore.ProcessTape(tape, 0, NxCore.NxCF_EXCLUDE_CRC_CHECK, i, OnNxCoreCallback) global counter print("Tape[{}] Count[{}] RC[{}] Millis[{}]".format(tape, counter, rc, round((time.time() - t0)*1000))) else: print("loading library failed") if __name__ == "__main__": threads = [] for i in range(len(tapeArray)): print("tape:" + tapeArray[i]) threads.append(Process(target=NxCoreThread, args=(tapeArray[i], i))) for thread in threads: thread.start() for thread in threads: thread.join() |