Como transformar objetos comuns em "objetos assíncronos"
Programação Concorrente em Erlang - A inspiração
Joe Armstrong, um dos criadores da linguagem Erlang nos mostrou uma maneira muito elegante de tratar concorrência na programação. Iremos introduzir alguns conceitos de Erlang que tangem a programação concorrente deixando de lado uma séria de sofisticações como o tratamento e propagação de erros, programação remota para focar nos conceitos mais básicos e gerais.O primeiro comando básico de concorrência é o BIF (built in function) ``spawn''. Essa função é usada para criar um novo ``processo" (na nomenclatura de Erlang, processo seria uma linha de execução (portanto uma thread) que não compartilha dados). A assinatura da função é a seguinte: spawn(Module, Exported Function, List of Arguments). Considere o seguinte módulo (extraído do tutorial do Erlang):
--module(example1). -export([start/0, say_something/2]). say_something(_, 0) -> done; say_something(What, Times)-> io:format("~p~n", [What]), say_something(What, Times - 1). start() -> spawn(example1, say_something, [hello, 3]), spawn(example1, say_something, [goodbye, 3]).A função start() deste módulo bem simples cria dois processos que executa a função \verb+say_something+ com os parâmetros $[hello, 3]$ e $[goodbye,3]$. Além disso, a função devolve o id do processo criado (como veremos a frente).
1> c(example1). {ok,example1} 2> example1:start(). hello goodbye <0.43.0> hello goodbye hello goodbye
Por enquanto, criamos processos que executam ações mas não escutam mensagens. Mas enviar mensagens para um objeto que não responde é como estender a mão a um cego esperando que aperte sua mão. Façamos então o processo enxegar ou ouvir mensagens. Para isso, utilizamos a estrutura receive.
A construção receive é usada para permitir que processos esperem mensagens de outros. Seu formato é:
receive pattern1 -> actions1; pattern2 -> actions2; .... patternN actionsN end.O que é um tanto quanto intuitivo. Ao receber alguma mensagem que coincida com o pattern1 o processo executará actions1 e assim sucessivamente. Outra construção igualmente importante é a forma como enviamos mensagens para um processo. Para isso usamos o operador ``!''.
Pid ! MessageA construção acima envia a mensagem Message (que pode ser qualquer ``termo'' Erlang) para processo cujo id seja igual a Pid. A seguir, um exemplo de programa com interações (processos enviando mensagens para outros processos), ilustrando o uso das construções citadas:
-module(example2). -export([start/0, ping/2, pong/0]). ping(0, Pong_PID) -> Pong_PID ! finished, io:format("ping finished~n", []); ping(N, Pong_PID) -> Pong_PID ! {ping, self()}, receive pong -> io:format("Ping received pong~n", []) end, ping(N - 1, Pong_PID). pong() -> receive finished -> io:format("Pong finished~n", []); {ping, Ping_PID} -> io:format("Pong received ping~n", []), Ping_PID ! pong, pong() end. start() -> Pong_PID = spawn(example2, pong, []), spawn(example2, ping, [3, Pong_PID]).Ao executar o código acima teremos o seguinte resultado:
1> c(example2.erl). {ok,example2} 2> example2:start(). Pong received ping <0.43.0> Ping received pong Pong received ping Ping received pong Pong received ping Ping received pong ping finished Pong finished
Surpreendentemente simples e útil. Por trás dessas simples construções, Erlang
cria um novo processo (spawn) e uma ``caixa de mensagens'' para este processo.
Sempre que uma mensagem é enviado ao processo (Pid ! Message), a mensagem fica
guardada na caixa de mensagens até que o processo esteja disponível para recebê-lo. O processo então retira a mensagem da caixa (receive) e seleciona qual ação tomará baseado no
padrão que a mesma se encaixa.
Asynchronous Proxy Factory - Java Strike Back
Um dos grandes apelos das linguagens tipadas é a facilidade de saber quaismensagens são entendidas por objetos dessa linguagem, e sempre que a assinatura
das mensagens enviadas não coincidirem com a interface do objeto tem-se um erro
em tempo de compilação. Isso permite a uma IDE como Eclipse fornecer dicas de
codificação no momento que o programador digita o código.
Em oposição, durante a codificação de mensagens em Erlang com a construção "Pid ! Message"
não é possível saber quais mensagens, afinal, o processo Pid escuta.
Java, não fornece estruturas simples como Erlang para mensageria mas um
complicado e também sofisticado arcabouço de concorrência java.util.concurrent.
Nossa proposta é criar uma biblioteca auxiliar que forneça os mecanismos de
concorrência para objetos que funcionem de forma ``síncrona'' usando apenas as
ferramentas já disponíveis na linguagem.
Versão OO do Ping-Pong
Para uma melhor comparação entre os conceitos de atores de Erlang e os conceitosde Orientação a Objetos de Java, iremos fazer uma adaptação do programa
ping-pong (example2). Tomaremos o cuidado de fazer uma versão testável, quer
dizer, ao invés dos métodos escreverem String na saída padrão, eles irão
preencher um ``log de conversação'' que será analisado num teste unitário. Comecemos
pelo teste.
@Test public void ping(){ StringBuffer string = new StringBuffer(); Ping ping = new Ping(string); PingInterface pong = new Ping(string); ping.sendPing(3, pong); verifyPingConversation(string, 3); }StringBuffer é uma versão sincronizada do StringBuilder. Isto vem a calhar
porque devemos evitar condições de corrida na escrita do log. A grosso modo,
este método é uma versão junit da função start() do programa example2.
A verificação que implementaremos apenas verifica se todas as falas foram
realizadas durante o diálogo dos atores. Inicializaremos uma mapa com as frases
e a quantidade de vezes que aquelas frases devem aparecer. Sempre que uma frase
for dita, iremos decrementar o contador no mapa até zerá-lo quando riscaremos
a frase do mapa.
private void verifyPingConversation(StringBuffer string, int i) { String[] messages = string.toString().split("\n"); MapAgora a versão Java do Ping. Bonus: definimos uma interface de comunicação,expectedResult = new HashMap (); expectedResult.put("Pong received ping", i); expectedResult.put("Ping received pong", i); expectedResult.put("Pong finished", 1); expectedResult.put("ping finished", 1); for (String string2 : messages) { Integer count = expectedResult.get(string2); count--; if (count ==0){ expectedResult.remove(string2); } else { expectedResult.put(string2, count); } } Assert.assertTrue(expectedResult.isEmpty()); }
eliminamos os seletores e a classe é testável.
public class Ping implements PingInterface { private StringBuffer conversation; public Ping(StringBuffer string){ conversation = string; } public void sendPing(int n, PingInterface pid) { if (n ==0 ){ pid.finished(); say("ping finished\n"); } else { pid.ping(this); sendPing(n-1, pid); } } private void say(String string) { conversation.append(string); } public void finished() { say("Pong finished\n"); } public void pong(PingInterface pong) { say("Ping received pong\n"); } public void ping(PingInterface ping) { say("Pong received ping\n"); ping.pong(this); } }
Padrão Produtor-Consumidor em Java
Inicialmente, precisaremos de um sistema de mensageria como Erlang. Paraisso, teremos uma ``caixa de mensagens'' que faremos com o uso de uma
BlockingQueue (fila bloqueante do pacote java.util.concurrent). Por questões de
simplicidade utilizaremos a implementação ArrayBlockingQueue.
Deixaremos um consumidor a espera de serviços que chegarão na
``caixa de mensagens''. Este trabalhador terá uma linha de execução
independente do programa principal e um reservatório de trabalhadores para
executar cada uma das mensagens.
Para controlar o ciclo de vida do consumidor, o consumidor irá dispor de:
- Uma tarefa kill; Ao receber a tarefa kill, o processo deve parar. A ideia é que essa seja a última tarefa a ser executada pelo processo.
- Uma variável sentinel; caso o valor falso seja atribuido o processo pára independente se outras tarefas existam na fila;
- um método para interrupção;
através da interface ExecutorService.
Abaixo, propriedades do executor conforme mencionado:
/** * caixa de mensagens (tarefas) do executor */ protected BlockingQueueO método principal do Executor, é o método público run() da interface Runnable.blockingQueue; /** * reservatorio de trabalhadores */ protected ExecutorService executorService; // variaveis de controle do ciclo de vida private boolean sentinel = true; /** * Tarefa que finaliza */ private final T kill; /** * variavel de sincronizacao: quando chegar a zero, o * processo terminou sua execucao */ private CountDownLatch countDownLatch = new CountDownLatch(1);
@Override public void run() { try { // importante para interromper o loop infinito while (isSentinelEnable()) { T t = getBlockingQueue().take(); if (t == kill) { kill(); sentinel = false; } else { execute(t); } } // anuncia que o programa terminou getCountDownLatch().countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }Se uma mensagem kill é recebida, o método kill() será invocado para que o objeto
possa realizar algum procedimento de sanidade. Para as demais mensagens, o
método execute() é invocado.
Enquanto execute() é abstrato o método kill() possui um implementação básica
vazia.
protected abstract void execute(T t); protected void kill() { }A classe possui ainda três métodos de razoável importância. O construtor, um
inicializador e um método de sincronização. Os demais métodos são accessors e
mutators, além de possíveis métodos para finalização.
/** * Instancia o executor, mas nao inicia seu processamento * @param n tamanho do pool de trabalhadores * @param m capacidade maxima da caixa de mensagens * @param kill tarefa utilizada para finalizar o processamento */ public Executor(int n, int m, T kill) { executorService = Executors.newFixedThreadPool(n); blockingQueue = new ArrayBlockingQueue(m); this.kill = kill; } /** * Inicia o processamento com uma nova linha de execucao */ public void execute() { final Thread thread = new Thread(this); thread.start(); } /** * aguarda o fim do processamento e finaliza o reservatorio * de trabalhadores */ public void awaitTermination() { try { getCountDownLatch().await(); this.executorService.shutdown(); this.executorService.awaitTermination(1000, TimeUnit.MILLISECONDS); } catch (final InterruptedException e) { e.printStackTrace(); } }
Quicksort Paralelo em Java
Façamos agora uma pausa, no desenvolvimento da solução para testar a classeacima. Como prova de conceito iremos implementar uma versão paralela do
Quicksort que utliza o código.
A ideia básica, em implementar um quicksort paralelo usando o padrão
produtor-consumidor é receber uma tarefa com um vetor a ser ordenado
e particioná-los através de um elemento pivo (desse vetor) e
adicionar cada uma das partições como novas tarefas da fila de execução do
executor.
@Override public void execute(QuickSortJobPodemos programar um certificado \footnote{algoritmos de verficação out) { int size = t.array.length; if (size < thresold){ Arrays.sort(t.array); t.result = t.array; t.array = null; if (t.joinToFather()){ doneSignal.countDown(); } return ; } int rand = random.nextInt(size); final T comparable = t.array[rand]; T[] rights = newInstanceArray(t); T[] lefts = newInstanceArray(t); T[] mids = newInstanceArray(t); int l = 0, r = 0, m = 0; for (int i = 0; i < t.array.length; i++) { T array_element = t.array[i]; if (array_element.compareTo(comparable) < 0){ lefts[l++] = array_element; continue; } if (array_element.compareTo(comparable) > 0){ rights[r++] = array_element; continue; } mids[m++] = array_element; } BlockingQueue > blockingQueue = getBlockingQueue(); blockingQueue.add(new QuickSortJob (t, QuickSortJob.LEFT, Arrays.copyOf(lefts, l))); t.mid = Arrays.copyOf(mids, m); blockingQueue.add (new QuickSortJob (t, QuickSortJob.RIGHT, Arrays.copyOf(rights, r))); t.array = null; }
certificados estão relacionados com complexidade computacional. Uma breve
explicação sobre o assunto pode nas do
Paulo Feofiloff para ordenação. Para isto criamos um vetor desordenado, executamos o ordenador e verificamos se os elementos do vetor estão ordenados.
public void testQuickSort() throws Exception { Random random = new Random(); int n = 200000; Integer[] problem = new Integer[n]; for (int i =0; i < n ; i++){ problem[i] = random.nextInt(); } final long timeMilis = System.currentTimeMillis(); QuickSortWorker.doneSignal = new CountDownLatch(1); originalJob = new QuickSortJobJunit verde! Podemos prosseguir com o desenvolvimento do utilitário.(null, 0, problem); QuickSortWorker executor = new QuickSortWorker (10, n); // executor executor.getBlockingQueue().add(originalJob); executor.setCountDownLatch(QuickSortWorker.doneSignal); executor.execute(); executor.awaitTermination(); final long lastMilis = System.currentTimeMillis(); System.out.println("Tempo Total:" + (lastMilis - timeMilis)); int temp = Integer.MIN_VALUE; QuickSortJob job = originalJob; for (int i = 0; i < job.result.length; i++) { assertTrue(temp <= job.result[i]); temp = job.result[i]; } }
Uma versão estática
O que a construção receive faz em Erlang, um ``switch case'' poderia fazer notemplate method execute. Podemos fazer melhor que isto. Em Java, é possível
fazer com que o compilador verifique se as mensagens enviadas são entendidas por
meio de sua assinatura. Para isto, iremos criar um wrapper que implementa
a (mesma) interface de um objeto alvo e transformar a mensagem bloqueante numa
assíncrona. Criaremos uma classe message com todas as informações necessárias
a execução da mensagem o próprio método (java.lang.reflec.Method), o objeto
alvo e os parâmetros da chamada.
public class Message implements Callable{ Object[] parameters; Object target; Method method; public Message(Object target2, Method method2, Object[] args) { parameters = args; this.method = method2; this.target = target2; } public Message() { } @Override public Object call() throws Exception { return method.invoke(target, parameters); } }Agora, iremos criamos uma especialização do Executor que processa uma FutureTask.
public class SpawnExecutor extends ExecutorDevemos encapsular o objeto Message numa FutureTask. Dessa forma, podemos{ public SpawnExecutor(int n, int m) { super(n, m, new FutureTask(new Message())); } @Override protected void execute(FutureTask task) { executorService.execute(task); } public void shutdown(){ try { getBlockingQueue().put(getKill()); } catch (InterruptedException e) { } } }
notificar os interessados sobre a conclusão de uma mensagem específica. Façamos então uma outra especialização (do Executor) que exporte estas funcionalidades além de
oferecer uma interface mais simples para o envio de mensagens.
public void send(Message message) { FutureTask futureTask = new FutureTask(message); spawnExecutor.getBlockingQueue().add(futureTask); notify(futureTask); } private void notify(FutureTask futureTask) { if (observer != null){ observer.notify(futureTask); } } private Observer observer; public void setObserver(Observer observer) { this.observer = observer; } public Observer getObserver() { return observer; }Na listagem, o efeito é conseguido com o padrão Decorator.
Temos um último problema: popular o objeto Message. Enquanto que objeto alvo e
os parâmetros são conhecidos, ainda precisamos encontrar o objeto Method apartir
do pacote de reflexão (queremos encontrar o método apartir do nome). Para isso,
faremos um objeto mais especializado que recebe uma mensagem e cria e popula um
objeto Message e o coloca na fila de trabalhos.
public abstract class AsynchronousStaticExecutor extends AsynchronousExecutor{ protected void sendMessage(Method method, Object[] params) { if (parameters.length - parameterTypes.length != 0) return false; for (int i = 0; i < parameters.length; i++) { Object object = parameters[i]; if (!parameterTypes[i].isAssignableFrom( object.getClass())) return false; } return true; } }Agora podemos criar a versão estática assíncrona do programa ping pong.
Mas, como de costume iremos apresentar o teste unitário para tal.
@Test public void testAsyncStaticPing(){ StringBuffer string = new StringBuffer(); Ping ping = new Ping(string); AsyncPing pong = new AsyncPing(ping); // iniciar processo pong.start(); ping.sendPing(3, pong); // finaliza processo pong.shutdown(); pong.awaitForTermination(); verifyPingConversation(string, 3); }E, a classe estática.
public class AsyncPing extends AsynchronousStaticExecutor implements PingInterface { private Ping target; public AsyncPing(Ping ping) { this.target = ping; } @Override protected Object getTarget() { return target; } @Override public void finished() { sendMessage("finished", new Object[] {}); } @Override public void ping(PingInterface ping) { sendMessage("ping", new Object[] { ping }); } @Override public void pong(PingInterface pong) { sendMessage("pong", new Object[] { pong }); } }
Java Lang Reflection
Um mago sabe a importância de um espelho. Entre as várias mágicas que o pacotede reflexão da linguagem está o truque de criar um proxy às chamadas
de um conjunto de interfaces. Essa simples mágica talvez seja a forma mais
fácil de adicionar algum comportamento em tempo de execução para algum objeto
genérico (outras formas seriam a manipulação de byte code ou do próprio
código-fonte). Por tamanha facilidade, a API de reflexão é utilizada nas
chamadas RMI para forjar um representante local de uma classe remota. Este
representante recebe as chamadas locais e delega sua execução a entidade remota
correspondente dando a ilusão ao programador que as chamadas são realizadas localmente.
E, tudo isso graças a uma fábrica de proxies e uma interface de manipulação:
public interface InvocationHandler { Object invoke(Object proxy, Method method, Object[] args) throws Throwable; }
public class GenericProxyFactory {Limitação importante: Não é possível
criar proxy para objetos sem interfaces. Para contornar o problema é
possível utlizar a refatoração: extract interface.
Aplicação: AsynchronousProxy
Vamos agora ao objetivo final desse artigo, apresentar um proxy para transformarchamadas bloqueantes em chamadas não bloqueantes. O uso básico desse proxy e
permitir que o programador faça uma versão simples do código, sem se preocupar
com concorrência e depois, sem mudança de paradigma ou expressivade, utilizar a
concorrência.
@Test public void testAsyncPing() throws Exception { StringBuffer string = new StringBuffer(); Ping ping = new Ping(string); Ping pong = new Ping(string); // iniciar processo AsynchronousProxyFactory asynchronousProxyFactory = new AsynchronousProxyFactory (1, 10, pong); PingInterface asyncPong = asynchronousProxyFactory.getAsynchronousProxy(PingInterface.class); asynchronousProxyFactory.start(); ping.sendPing(3, asyncPong); // finalizar processo asynchronousProxyFactory.shutdown(); asynchronousProxyFactory.awaitForTermination(); verifyPingConversation(string, 3); } public class AsynchronousInvocationHandler implements InvocationHandler{ private final AsynchronousExecutor asynchronousExecutor; private final T target; public AsynchronousInvocationHandler (AsynchronousExecutor asynchronousObject, T target) { this.asynchronousExecutor = asynchronousObject; this.target = target; } @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { asynchronousExecutor.send(new Message(target, method, args)); return null; } } public class AsynchronousProxyFactory extends AsynchronousExecutor { private T target; public AsynchronousProxyFactory(int n, int m, T obj) { super(n, m); this.target = obj; } @SuppressWarnings("unchecked") public T getAsynchronousProxy(Classintf) { return (T) Proxy.newProxyInstance(target.getClass().getClassLoader(), new Class[] { intf }, new AsynchronousInvocationHandler(this, target)); } }
Feito!
Comentários