package j**a.util.concurrent;
import j**a.util.concurrent.atomic.atomicinteger;
import j**a.util.concurrent.atomic.atomicreference;
import j**a.util.concurrent.locks.locksupport;
public class exchanger
/** the corresponding thread local class */
static final class participant extends threadlocal
}private final participant participant;
private volatile node arena;
private volatile node slot;
private volatile int bound;
private final object arenaexchange(object item, boolean timed, long ns)
else if (i <= (m = (b = bound) & mmask) && q == null)
else if (spins > 0)
else if (u.getobjectvolatile(a, j) != p)
spins = spins; // releaser hasn't set match yet
else if (!t.isinterrupted() && m == 0 &&
(!timed ||
(ns = end - system.nanotime()) > 0l))
else if (u.getobjectvolatile(a, j) == p &&
u.compareandswapobject(a, j, p, null)) }}
else
p.item = null; // clear offer
}else
else if ((c = p.collides) < m || m == full ||
!u.compareandswapint(this, bound, b, b + seq + 1))
else
i = m + 1; // grow
p.index = i;}}
}private final object slotexchange(object item, boolean timed, long ns)
// create arena on contention, but continue until slot null
if (ncpu > 1 && bound == 0 &&
u.compareandswapint(this, bound, 0, seq))
arena = new node[(full + 2) << ashift];
}else if (arena != null)
return null; // caller must reroute to arenaexchange
else
}// await release
int h = p.hash;
long end = timed ? system.nanotime() + ns : 0l;
int spins = (ncpu > 1) ? spins : 1;
object v;
while ((v = p.match) == null)
else if (slot != p)
spins = spins;
else if (!t.isinterrupted() && arena == null &&
(!timed || (ns = end - system.nanotime()) > 0l))
else if (u.compareandswapobject(this, slot, p, null))
}u.putorderedobject(p, match, null);
p.item = null;
p.hash = h;
return v;
}/**
* creates a new exchanger.
*/public exchanger()
@suppresswarnings("unchecked")
public v exchange(v x) throws interruptedexception
@suppresswarnings("unchecked")
public v exchange(v x, long timeout, timeunit unit)
throws interruptedexception, timeoutexception
// unsafe mechanics
private static final sun.misc.unsafe u;
private static final long bound;
private static final long slot;
private static final long match;
private static final long blocker;
private static final int abase;
static catch (exception e)
if ((s & (s-1)) != 0 || s > (1 << ashift))
throw new error("unsupported array scale");
}}
問:你知道 j**a 的 exchanger 嗎?簡單說說其特點及應用場景?
答:exchanger 是 jdk 1.5 開始提供的乙個用於兩個工作執行緒之間交換資料的封裝工具類,簡單說就是乙個執行緒在完成一定的事務後想與另乙個執行緒交換資料,則第乙個先拿出資料的執行緒會一直等待第二個執行緒,直到第二個執行緒拿著資料到來時才能彼此交換對應資料。其定義為exchanger
泛型型別,其中 v 表示可交換的資料型別,對外提供的介面很簡單,具體如下:
可以看出,當乙個執行緒到達 exchange 呼叫點時,如果其他執行緒此前已經呼叫了此方法,則其他執行緒會被排程喚醒並與之進行物件交換,然後各自返回;如果其他執行緒還沒到達交換點,則當前執行緒會被掛起,直至其他執行緒到達才會完成交換並正常返回,或者當前執行緒被中斷或超時返回。
public class test
@override
public void run() catch (interruptedexception e) }}
}static class consumer extends thread
@override
public void run() catch (interruptedexception e)
system.out.println(getname()+" 交換後:" + data);}}
}public static void main(string args) throws interruptedexception
}
可以看到,其結果可能如下:
consumer- 交換前:0
producer- 交換前:1
consumer- 交換後:1
consumer- 交換前:0
producer- 交換後:0
producer- 交換前:2
producer- 交換後:0
consumer- 交換後:2
consumer- 交換前:0
producer- 交換前:3
producer- 交換後:0
consumer- 交換後:3
consumer- 交換前:0
producer- 交換前:4
producer- 交換後:0
consumer- 交換後:4
consumer- 交換前:0
可以看到,如上就是一種典型的使用場景,簡單理解這東西就是乙個併發協作的工具類而已。 併發工具類 Exchanger
exchanger是特別容易理解的乙個工具類,它可以在兩個執行緒之間交換資料,只能是2個執行緒,他不支援更多的執行緒之間互換資料。當執行緒1呼叫exchange物件的exchange 方法後,他會陷入阻塞狀態,直到執行緒2也呼叫了exchange 方法,然後以執行緒安全的方式交換資料,之後執行緒a和...
java併發程式設計之Exchanger
exchanger v 可以交換的物件型別 可以在對中對元素進行配對和交換的執行緒的同步點。每個執行緒將條目上的某個方法呈現給 exchange 方法,與夥伴執行緒進行匹配,並且在返回時接收其夥伴的物件。exchanger 可能被視為 synchronousqueue 的雙向形式。exchanger...
併發工具類之 Exchanger
exchanger 交換者 是乙個用於執行緒間協作的工具類。exchanger 用於進行執行緒間的資料交換。兩個執行緒通過exchange 方法交換資料,第乙個執行緒執行到exchange 方法後會一直等待第二個執行緒也執行exchange 方法,當兩個執行緒都到達同步點後,這兩個執行緒就可以交換資...