Pular para o conteúdo principal

Magia Assíncrona

Versão PDF


Como transformar objetos comuns em "objetos assíncronos"

Programação Concorrente em Erlang - A inspiração

Joe Armstrong, um dos criadores da linguagem Erlang nos mostrou uma maneira muito elegante de tratar concorrência na programação. Iremos introduzir alguns conceitos de Erlang que tangem a programação concorrente deixando de lado uma séria de sofisticações como o tratamento e propagação de erros, programação remota para focar nos conceitos mais básicos e gerais.
O primeiro comando básico de concorrência é o BIF (built in function) ``spawn''. Essa função é usada para criar um novo ``processo" (na nomenclatura de Erlang, processo seria uma linha de execução (portanto uma thread) que não compartilha dados). A assinatura da função é a seguinte: spawn(Module, Exported Function, List of Arguments). Considere o seguinte módulo (extraído do tutorial do Erlang):
--module(example1).

-export([start/0, say_something/2]).

say_something(_, 0) -> done;

say_something(What, Times)-> 
 io:format("~p~n", [What]),
 say_something(What, Times - 1).

start() ->
 spawn(example1, say_something, [hello, 3]),
 spawn(example1, say_something, [goodbye, 3]).
A função start() deste módulo bem simples cria dois processos que executa a função \verb+say_something+ com os parâmetros $[hello, 3]$ e $[goodbye,3]$. Além disso, a função devolve o id do processo criado (como veremos a frente).
1> c(example1).
{ok,example1}
2> example1:start().
hello
goodbye
<0.43.0>
hello
goodbye
hello
goodbye

Por enquanto, criamos processos que executam ações mas não escutam mensagens. Mas enviar mensagens para um objeto que não responde é como estender a mão a um cego esperando que aperte sua mão. Façamos então o processo enxegar ou ouvir mensagens. Para isso, utilizamos a estrutura receive.


A construção receive é usada para permitir que processos esperem mensagens de outros. Seu formato é:

receive
 pattern1 ->
  actions1;
 pattern2 ->
  actions2;
 ....
 patternN
  actionsN
end.
O que é um tanto quanto intuitivo. Ao receber alguma mensagem que coincida com o pattern1 o processo executará actions1 e assim sucessivamente. Outra construção igualmente importante é a forma como enviamos mensagens para um processo. Para isso usamos o operador ``!''.
Pid ! Message 
A construção acima envia a mensagem Message (que pode ser qualquer ``termo'' Erlang) para processo cujo id seja igual a Pid. A seguir, um exemplo de programa com interações (processos enviando mensagens para outros processos), ilustrando o uso das construções citadas:
-module(example2).

-export([start/0, ping/2, pong/0]).

ping(0, Pong_PID) ->
 Pong_PID ! finished,
 io:format("ping finished~n", []);

ping(N, Pong_PID) ->
 Pong_PID ! {ping, self()},
 receive
  pong -> io:format("Ping received pong~n", [])
 end,
 ping(N - 1, Pong_PID).

pong() ->
 receive
  finished -> io:format("Pong finished~n", []);
  {ping, Ping_PID} ->
   io:format("Pong received ping~n", []),
   Ping_PID ! pong, pong()
 end.
start() ->
 Pong_PID = spawn(example2, pong, []),
 spawn(example2, ping, [3, Pong_PID]).
Ao executar o código acima teremos o seguinte resultado:
1> c(example2.erl).
{ok,example2}
2> example2:start().
Pong received ping
<0.43.0>
Ping received pong
Pong received ping
Ping received pong
Pong received ping
Ping received pong
ping finished
Pong finished

Surpreendentemente simples e útil. Por trás dessas simples construções, Erlang
cria um novo processo (spawn) e uma ``caixa de mensagens'' para este processo.
Sempre que uma mensagem é enviado ao processo (Pid ! Message), a mensagem fica
guardada na caixa de mensagens até que o processo esteja disponível para recebê-lo. O processo então retira a mensagem da caixa (receive) e seleciona qual ação tomará baseado no
padrão que a mesma se encaixa.

Asynchronous Proxy Factory - Java Strike Back

Um dos grandes apelos das linguagens tipadas é a facilidade de saber quais
mensagens são entendidas por objetos dessa linguagem, e sempre que a assinatura
das mensagens enviadas não coincidirem com a interface do objeto tem-se um erro
em tempo de compilação. Isso permite a uma IDE como Eclipse fornecer dicas de
codificação no momento que o programador digita o código.
Em oposição, durante a codificação de mensagens em Erlang com a construção "Pid ! Message"
não é possível saber quais mensagens, afinal, o processo Pid escuta.
Java, não fornece estruturas simples como Erlang para mensageria mas um
complicado e também sofisticado arcabouço de concorrência java.util.concurrent.
Nossa proposta é criar uma biblioteca auxiliar que forneça os mecanismos de
concorrência para objetos que funcionem de forma ``síncrona'' usando apenas as
ferramentas já disponíveis na linguagem.

Versão OO do Ping-Pong

Para uma melhor comparação entre os conceitos de atores de Erlang e os conceitos
de Orientação a Objetos de Java, iremos fazer uma adaptação do programa
ping-pong (example2). Tomaremos o cuidado de fazer uma versão testável, quer
dizer, ao invés dos métodos escreverem String na saída padrão, eles irão
preencher um ``log de conversação'' que será analisado num teste unitário. Comecemos
pelo teste.

@Test
 public void ping(){
  StringBuffer string = new StringBuffer();
  Ping ping = new Ping(string);
  PingInterface pong = new Ping(string);
  ping.sendPing(3, pong);
  verifyPingConversation(string, 3);
 }
StringBuffer é uma versão sincronizada do StringBuilder. Isto vem a calhar
porque devemos evitar condições de corrida na escrita do log. A grosso modo,
este método é uma versão junit da função start() do programa example2.
A verificação que implementaremos apenas verifica se todas as falas foram
realizadas durante o diálogo dos atores. Inicializaremos uma mapa com as frases
e a quantidade de vezes que aquelas frases devem aparecer. Sempre que uma frase
for dita, iremos decrementar o contador no mapa até zerá-lo quando riscaremos
a frase do mapa.

private void verifyPingConversation(StringBuffer string, int i) {
  String[] messages = string.toString().split("\n");
  Map expectedResult = 
   new HashMap();
  expectedResult.put("Pong received ping", i);
  expectedResult.put("Ping received pong", i);
  expectedResult.put("Pong finished", 1);
  expectedResult.put("ping finished", 1);
  for (String string2 : messages) {
   Integer count = expectedResult.get(string2);
   count--;
   if (count ==0){
    expectedResult.remove(string2);
   } else {
    expectedResult.put(string2, count);
   }
  }
  Assert.assertTrue(expectedResult.isEmpty());
 }
Agora a versão Java do Ping. Bonus: definimos uma interface de comunicação,
eliminamos os seletores e a classe é testável.

public class Ping implements PingInterface {
 private StringBuffer conversation;


 public Ping(StringBuffer string){
  conversation = string;
 }

 public void sendPing(int n, PingInterface pid) {
  if (n ==0 ){
   pid.finished();
   say("ping finished\n");   
  } else {
   pid.ping(this);
   sendPing(n-1, pid);
  }
 }

 private void say(String string) {
  conversation.append(string);
 }

 public void finished() {
  say("Pong finished\n"); 
 }

 public void pong(PingInterface pong) {
  say("Ping received pong\n");
 }
 

 public void ping(PingInterface ping) {
  say("Pong received ping\n");
  ping.pong(this);
 }
}

Padrão Produtor-Consumidor em Java

Inicialmente, precisaremos de um sistema de mensageria como Erlang. Para
isso, teremos uma ``caixa de mensagens'' que faremos com o uso de uma
BlockingQueue (fila bloqueante do pacote java.util.concurrent). Por questões de
simplicidade utilizaremos a implementação ArrayBlockingQueue.
Deixaremos um consumidor a espera de serviços que chegarão na
``caixa de mensagens''. Este trabalhador terá uma linha de execução
independente do programa principal e um reservatório de trabalhadores para
executar cada uma das mensagens.
Para controlar o ciclo de vida do consumidor, o consumidor irá dispor de:

  • Uma tarefa kill; Ao receber a tarefa kill, o processo deve parar. A ideia é que essa seja a última tarefa a ser executada pelo processo.
  • Uma variável sentinel; caso o valor falso seja atribuido o processo pára independente se outras tarefas existam na fila;
  • um método para interrupção;
Internamente, o consumidor terá controle sobre o reservatório de trabalhadores
através da interface ExecutorService.
Abaixo, propriedades do executor conforme mencionado:

/**
  * caixa de mensagens (tarefas) do executor
  */
 protected BlockingQueue blockingQueue;

 /**
  * reservatorio de trabalhadores
  */
 protected ExecutorService executorService;

 
 // variaveis de controle do ciclo de vida
 private boolean sentinel = true;
 /**
  *  Tarefa que finaliza
  */
 private final T kill;

 /**
  * variavel de sincronizacao: quando chegar a zero, o
  *  processo terminou sua execucao
  */
 private CountDownLatch countDownLatch = new CountDownLatch(1);
O método principal do Executor, é o método público run() da interface Runnable.

@Override
 public void run() {
  try {
   // importante para interromper o loop infinito
   while (isSentinelEnable()) {
    T t = getBlockingQueue().take();
    if (t == kill) {
     kill();
     sentinel = false;
    } else {
     execute(t);
    }
   }
   // anuncia que o programa terminou
   getCountDownLatch().countDown();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 } 
Se uma mensagem kill é recebida, o método kill() será invocado para que o objeto
possa realizar algum procedimento de sanidade. Para as demais mensagens, o
método execute() é invocado.
Enquanto execute() é abstrato o método kill() possui um implementação básica
vazia.

protected abstract void execute(T t);

 protected void kill() {
 }
A classe possui ainda três métodos de razoável importância. O construtor, um
inicializador e um método de sincronização. Os demais métodos são accessors e
mutators, além de possíveis métodos para finalização.

/**
  * Instancia o executor, mas nao inicia seu processamento 
  * @param n tamanho do pool de trabalhadores
  * @param m capacidade maxima da caixa de mensagens
  * @param kill tarefa utilizada para finalizar o processamento
  */
 public Executor(int n, int m, T kill) {
  executorService = Executors.newFixedThreadPool(n);
  blockingQueue = new ArrayBlockingQueue(m);
  this.kill = kill;
 }

 /**
  * Inicia o processamento com uma nova linha de execucao
  */
 public void execute() {
  final Thread thread = new Thread(this);
  thread.start();
 }

 /**
  * aguarda o fim do processamento e finaliza o reservatorio 
  * de trabalhadores
  */
 public void awaitTermination() {
  try {
   getCountDownLatch().await();
   this.executorService.shutdown();
   this.executorService.awaitTermination(1000,
                       TimeUnit.MILLISECONDS);
  } catch (final InterruptedException e) {
   e.printStackTrace();
  }
 }

Quicksort Paralelo em Java

Façamos agora uma pausa, no desenvolvimento da solução para testar a classe
acima. Como prova de conceito iremos implementar uma versão paralela do
Quicksort que utliza o código.
A ideia básica, em implementar um quicksort paralelo usando o padrão
produtor-consumidor é receber uma tarefa com um vetor a ser ordenado
e particioná-los através de um elemento pivo (desse vetor) e
adicionar cada uma das partições como novas tarefas da fila de execução do
executor.

@Override
 public void execute(QuickSortJob t) {
  int size = t.array.length;
  if (size < thresold){
   Arrays.sort(t.array);
   t.result = t.array;
   t.array = null;
   if (t.joinToFather()){
    doneSignal.countDown();
   }
   return ;
  } 
  int rand = random.nextInt(size);
  final T comparable = t.array[rand];
  T[] rights = newInstanceArray(t);
  T[] lefts = newInstanceArray(t);
  T[] mids  = newInstanceArray(t);
  int l = 0, r = 0, m = 0;
  for (int i = 0; i < t.array.length; i++) {
    T array_element = t.array[i];
   if (array_element.compareTo(comparable) < 0){
    lefts[l++] = array_element;
    continue;
   }
   if (array_element.compareTo(comparable) > 0){
    rights[r++] = array_element;
    continue;
   }
   mids[m++] = array_element;
  }  
  BlockingQueue> blockingQueue = getBlockingQueue();
  blockingQueue.add(new QuickSortJob
   (t, QuickSortJob.LEFT, Arrays.copyOf(lefts, l)));
  t.mid = Arrays.copyOf(mids, m);
  blockingQueue.add
   (new QuickSortJob(t, 
    QuickSortJob.RIGHT, Arrays.copyOf(rights, r)));
  t.array = null;
 }
Podemos programar um certificado \footnote{algoritmos de verficação ou
certificados estão relacionados com complexidade computacional. Uma breve
explicação sobre o assunto pode nas do
Paulo Feofiloff para ordenação. Para isto criamos um vetor desordenado, executamos o ordenador e verificamos se os elementos do vetor estão ordenados.

public void testQuickSort() throws Exception { 
  Random random = new Random(); int n = 200000;  
  Integer[] problem = new Integer[n];
  for (int i =0; i < n ; i++){
      problem[i] = random.nextInt();
  }
  final long timeMilis = System.currentTimeMillis();  
  QuickSortWorker.doneSignal = new CountDownLatch(1);
  originalJob =  new QuickSortJob(null, 0, problem);  
  QuickSortWorker executor = 
    new QuickSortWorker(10, n); // executor  
  executor.getBlockingQueue().add(originalJob);
  executor.setCountDownLatch(QuickSortWorker.doneSignal);
  executor.execute();
  executor.awaitTermination();
  
  final long lastMilis = System.currentTimeMillis();
  System.out.println("Tempo Total:" + (lastMilis - timeMilis));
  
  int temp = Integer.MIN_VALUE;
  QuickSortJob job = originalJob;
  for (int i = 0; i < job.result.length; i++) {
   assertTrue(temp <= job.result[i]);
   temp = job.result[i];
  }

 }
Junit verde! Podemos prosseguir com o desenvolvimento do utilitário.


Uma versão estática

O que a construção receive faz em Erlang, um ``switch case'' poderia fazer no
template method execute. Podemos fazer melhor que isto. Em Java, é possível
fazer com que o compilador verifique se as mensagens enviadas são entendidas por
meio de sua assinatura. Para isto, iremos criar um wrapper que implementa
a (mesma) interface de um objeto alvo e transformar a mensagem bloqueante numa
assíncrona. Criaremos uma classe message com todas as informações necessárias
a execução da mensagem o próprio método (java.lang.reflec.Method), o objeto
alvo e os parâmetros da chamada.


public class Message implements Callable{
 
 Object[] parameters;
 Object target;
 Method method;
 
 public Message(Object target2, Method method2, Object[] args) {
  parameters = args;
  this.method = method2;
  this.target = target2;
 }

 public Message() {
 }

 @Override
 public Object call() throws Exception {
  return method.invoke(target, parameters);
 }

}
Agora, iremos criamos uma especialização do Executor que processa uma FutureTask.

public class SpawnExecutor extends Executor{
 public SpawnExecutor(int n, int m) {
  super(n, m, new FutureTask(new Message()));
 }
 
 @Override
 protected void execute(FutureTask task) {
  executorService.execute(task);
 }

 public void shutdown(){
  try {
   getBlockingQueue().put(getKill());
  } catch (InterruptedException e) {
  }
 }
}
Devemos encapsular o objeto Message numa FutureTask. Dessa forma, podemos
notificar os interessados sobre a conclusão de uma mensagem específica. Façamos então uma outra especialização (do Executor) que exporte estas funcionalidades além de
oferecer uma interface mais simples para o envio de mensagens.

public void send(Message message) {
  FutureTask futureTask = new FutureTask(message);
  spawnExecutor.getBlockingQueue().add(futureTask);
  notify(futureTask);
 }

 private void notify(FutureTask futureTask) {
  if (observer != null){
   observer.notify(futureTask);
  }
 }

 private Observer observer;
 
 public void setObserver(Observer observer) {
  this.observer = observer;
 }
 
 public Observer getObserver() {
  return observer;
 }
Na listagem, o efeito é conseguido com o padrão Decorator.
Temos um último problema: popular o objeto Message. Enquanto que objeto alvo e
os parâmetros são conhecidos, ainda precisamos encontrar o objeto Method apartir
do pacote de reflexão (queremos encontrar o método apartir do nome). Para isso,
faremos um objeto mais especializado que recebe uma mensagem e cria e popula um
objeto Message e o coloca na fila de trabalhos.

public abstract class AsynchronousStaticExecutor extends AsynchronousExecutor{
  protected void sendMessage(Method method, Object[] params) {
  if (parameters.length - parameterTypes.length != 0)
   return false;
  for (int i = 0; i < parameters.length; i++) {
   Object object = parameters[i];
   if (!parameterTypes[i].isAssignableFrom(
           object.getClass()))
    return false;
  }
  return true;
 }
}
Agora podemos criar a versão estática assíncrona do programa ping pong.
Mas, como de costume iremos apresentar o teste unitário para tal.

@Test
 public void testAsyncStaticPing(){
  StringBuffer string = new StringBuffer();
  Ping ping = new Ping(string);  
  AsyncPing pong = new AsyncPing(ping);
  // iniciar processo
  pong.start();
  
  ping.sendPing(3, pong);
  
  // finaliza processo
  pong.shutdown();
  pong.awaitForTermination();
  
  verifyPingConversation(string, 3);
 }
E, a classe estática.

public class AsyncPing extends AsynchronousStaticExecutor 
     implements PingInterface {

 private Ping target;

 public AsyncPing(Ping ping) {
  this.target = ping;
 }

 @Override
 protected Object getTarget() {
  return target;
 }

 @Override
 public void finished() {
  sendMessage("finished", new Object[] {});
 }

 @Override
 public void ping(PingInterface ping) {
  sendMessage("ping", new Object[] { ping });
 }

 @Override
 public void pong(PingInterface pong) {
  sendMessage("pong", new Object[] { pong });
 }
}

Java Lang Reflection

Um mago sabe a importância de um espelho. Entre as várias mágicas que o pacote
de reflexão da linguagem está o truque de criar um proxy às chamadas
de um conjunto de interfaces. Essa simples mágica talvez seja a forma mais
fácil de adicionar algum comportamento em tempo de execução para algum objeto
genérico (outras formas seriam a manipulação de byte code ou do próprio
código-fonte). Por tamanha facilidade, a API de reflexão é utilizada nas
chamadas RMI para forjar um representante local de uma classe remota. Este
representante recebe as chamadas locais e delega sua execução a entidade remota
correspondente dando a ilusão ao programador que as chamadas são realizadas localmente.
E, tudo isso graças a uma fábrica de proxies e uma interface de manipulação:

public interface InvocationHandler {
    Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable;
}
public class GenericProxyFactory {
Limitação importante: Não é possível
criar proxy para objetos sem interfaces. Para contornar o problema é
possível utlizar a refatoração: extract interface.

Aplicação: AsynchronousProxy

Vamos agora ao objetivo final desse artigo, apresentar um proxy para transformar
chamadas bloqueantes em chamadas não bloqueantes. O uso básico desse proxy e
permitir que o programador faça uma versão simples do código, sem se preocupar
com concorrência e depois, sem mudança de paradigma ou expressivade, utilizar a
concorrência.
@Test
 public void testAsyncPing() throws Exception {
  StringBuffer string = new StringBuffer();
  Ping ping = new Ping(string);
  Ping pong = new Ping(string);
  // iniciar processo
  AsynchronousProxyFactory asynchronousProxyFactory = new AsynchronousProxyFactory (1, 10, pong);
  PingInterface asyncPong = asynchronousProxyFactory.getAsynchronousProxy(PingInterface.class);
  asynchronousProxyFactory.start();
  
  ping.sendPing(3, asyncPong);
  
  // finalizar processo
  asynchronousProxyFactory.shutdown();
  asynchronousProxyFactory.awaitForTermination();
  
  verifyPingConversation(string, 3);
 }

public class AsynchronousInvocationHandler implements InvocationHandler{
 private final AsynchronousExecutor asynchronousExecutor;
 private final T target; 
 public AsynchronousInvocationHandler
   (AsynchronousExecutor asynchronousObject, T target) {
  this.asynchronousExecutor = asynchronousObject;
  this.target = target;
 }
 @Override
 public Object invoke
   (Object proxy, Method method, Object[] args) throws Throwable {
  asynchronousExecutor.send(new Message(target, method, args));
  return null; 
 }
}

public class AsynchronousProxyFactory extends AsynchronousExecutor {

 private T target;

 public AsynchronousProxyFactory(int n, int m, T obj) {
  super(n, m);
  this.target = obj;
 }

 @SuppressWarnings("unchecked")
 public T getAsynchronousProxy(Class intf) {
  return (T) Proxy.newProxyInstance(target.getClass().getClassLoader(),
    new Class[] { intf },
    new AsynchronousInvocationHandler(this, target));
 }
 
 
}

Feito!

Comentários

Postagens mais visitadas deste blog

Um texto pós-moderno - better man

Espere olhando para as horas... são 4 horas. Tem que parar. Nesse tom melancólico, começa a modesta música "better man", uma balada pop composta por Eddie Vedder ainda na adolescência. A música é a ilustração perfeita da ironia. O próprio título é irônico, uma vez que em momento algum na música aparece um better man. She lies and says she's in love with him, can't find a better man... Irônico, não!? Para começar, com a personagem central da história, a mulher que aguarda tarde da noite seu esposo... Ela chega a treinar com o espelho o fim do relacionamento. E o que faz? Diz a negação do que queria dizer. Vedder escreve músicas sobre sentimentos fortes. Sua relação com a mãe foi bastante complicada pelo o que descreve em suas canções. Na trilogia Mommy, Vedder descreve um homem perturbado com o relacionamento materno; a mãe mente para o filho sobre a identidade do pai, revela a verdade para o garoto na puberdade dizendo a ele como se parece com o verdadeiro pai e o

Pequeno manual do ócio em terras alemãs

  Pequeno manual do ócio em terras alemãs Como Lei alemã favorece aproveitadoras (e alguns aproveitadores que nunca tive o desprazer de conhecer)   Há algumas vias pelas quais pessoas de países em desenvolvimento migram para países como a Alemanha.   Por exemplo, é sabido que países desenvolvidos sofrem de escassez de mão-de-obra qualificada. Por esse motivo, países como a Alemanha dispõe vistos "especiais" para profissionais em demanda. Esse é o conceito do Blaukart (Blue Card) que na Alemanha se destina a profissionais salário anual seja superior a 55 mil euros ou 43 mil no caso de profissionais de áreas em alta demanda. Não há como recrutar essa mão-de-obra sem que a família desses profissionais também possa ser relocada. Então esses profissionais e seus familiares são relocados.   Além de se qualificar para essas vagas em demanda, ou ser parte direta da família qualificada, outra via possível para a imigração para o território alemão é através do matrimôni

O argumento anti-álcool

A lógica contra a produção do álcool é mais ou menos a seguinte: Os produtores capitalistas, produtores do combustível de humanos e máquinas irão preferir vender combustível mais caro para os mais ricos do que comida barata para os mais pobres. Máquinas e homens irão competir por combustível... Mas enquanto os ricos terão dinheiro para comprar comida e combustível o que sobrará aos pobres!? Vale lembrar que não importa se a produção é de cana ou de milho, a competição é pela terra e não pelo grão. Ainda, mesmo que o país agrícola taxe o produtor de combustível de maneira diferenciada ao produtor de comida, o governo teria maiores dificuldades em repartir o "bolo", haja vista que os governos que temos não são as instituições mais eficientes e, além do que, a comida estará mais cara. Ora, esquecem os "amigos" comunistas que a venda de biocombustível dará aos países agrícolas uma oportunidade ímpar de participar da economia mundial como protagonistas, e não meros fi